Repository: nifi
Updated Branches:
  refs/heads/master 8ebb4d197 -> 72ea93a65


NIFI-5868: Added instrumentation around ListFile such that all disk accesses 
are timed and any unusually long listing times or disk access operations can be 
logged. Additionally, information is logged at a debug level including 
significant amounts of troubleshooting information when configured to do so

This closes #3202.

Signed-off-by: Bryan Bende <bbe...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/72ea93a6
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/72ea93a6
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/72ea93a6

Branch: refs/heads/master
Commit: 72ea93a657cd673bfc40ef74e172f5efb950e95d
Parents: 8ebb4d1
Author: Mark Payne <marka...@hotmail.com>
Authored: Wed Dec 5 09:37:20 2018 -0500
Committer: Bryan Bende <bbe...@apache.org>
Committed: Wed Dec 5 13:01:14 2018 -0500

----------------------------------------------------------------------
 .../nifi/processors/standard/ListFile.java      | 974 +++++++++++++++++--
 .../nifi/processors/standard/TestListFile.java  | 154 ++-
 2 files changed, 1032 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/72ea93a6/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
index 8b73b97..42e2d3f 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
@@ -27,12 +27,15 @@ import 
org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessorInitializationContext;
@@ -41,6 +44,7 @@ import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processor.util.list.AbstractListProcessor;
 import org.apache.nifi.processor.util.list.ListedEntityTracker;
 import org.apache.nifi.processors.standard.util.FileInfo;
+import org.apache.nifi.util.Tuple;
 
 import java.io.File;
 import java.io.IOException;
@@ -57,21 +61,32 @@ import java.nio.file.attribute.PosixFilePermissions;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiPredicate;
+import java.util.function.Supplier;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static 
org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY;
+import static 
org.apache.nifi.processor.util.StandardValidators.POSITIVE_INTEGER_VALIDATOR;
+import static 
org.apache.nifi.processor.util.StandardValidators.TIME_PERIOD_VALIDATOR;
+
 @TriggerSerially
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
 @Tags({"file", "get", "list", "ingest", "source", "filesystem"})
@@ -115,15 +130,15 @@ public class ListFile extends 
AbstractListProcessor<FileInfo> {
     static final AllowableValue LOCATION_REMOTE = new AllowableValue("Remote", 
"Remote", "Input Directory is located on a remote system. State will be stored 
across the cluster so that "
         + "the listing can be performed on Primary Node Only and another node 
can pick up where the last node left off, if the Primary Node changes");
 
-    public static final PropertyDescriptor DIRECTORY = new 
PropertyDescriptor.Builder()
+    public static final PropertyDescriptor DIRECTORY = new Builder()
             .name("Input Directory")
             .description("The input directory from which files to pull files")
             .required(true)
             
.addValidator(StandardValidators.createDirectoryExistsValidator(true, false))
-            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .expressionLanguageSupported(VARIABLE_REGISTRY)
             .build();
 
-    public static final PropertyDescriptor RECURSE = new 
PropertyDescriptor.Builder()
+    public static final PropertyDescriptor RECURSE = new Builder()
             .name("Recurse Subdirectories")
             .description("Indicates whether to list files from subdirectories 
of the directory")
             .required(true)
@@ -131,7 +146,7 @@ public class ListFile extends 
AbstractListProcessor<FileInfo> {
             .defaultValue("true")
             .build();
 
-    public static final PropertyDescriptor DIRECTORY_LOCATION = new 
PropertyDescriptor.Builder()
+    public static final PropertyDescriptor DIRECTORY_LOCATION = new Builder()
             .name("Input Directory Location")
             .description("Specifies where the Input Directory is located. This 
is used to determine whether state should be stored locally or across the 
cluster.")
             .allowableValues(LOCATION_LOCAL, LOCATION_REMOTE)
@@ -139,7 +154,7 @@ public class ListFile extends 
AbstractListProcessor<FileInfo> {
             .required(true)
             .build();
 
-    public static final PropertyDescriptor FILE_FILTER = new 
PropertyDescriptor.Builder()
+    public static final PropertyDescriptor FILE_FILTER = new Builder()
             .name("File Filter")
             .description("Only files whose names match the given regular 
expression will be picked up")
             .required(true)
@@ -147,14 +162,14 @@ public class ListFile extends 
AbstractListProcessor<FileInfo> {
             .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
             .build();
 
-    public static final PropertyDescriptor PATH_FILTER = new 
PropertyDescriptor.Builder()
+    public static final PropertyDescriptor PATH_FILTER = new Builder()
             .name("Path Filter")
             .description("When " + RECURSE.getName() + " is true, then only 
subdirectories whose path matches the given regular expression will be scanned")
             .required(false)
             .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
             .build();
 
-    public static final PropertyDescriptor INCLUDE_FILE_ATTRIBUTES = new 
PropertyDescriptor.Builder()
+    public static final PropertyDescriptor INCLUDE_FILE_ATTRIBUTES = new 
Builder()
         .name("Include File Attributes")
         .description("Whether or not to include information such as the file's 
Last Modified Time and Owner as FlowFile Attributes. "
             + "Depending on the File System being used, gathering this 
information can be expensive and as a result should be disabled. This is 
especially true of remote file shares.")
@@ -163,22 +178,22 @@ public class ListFile extends 
AbstractListProcessor<FileInfo> {
         .required(true)
         .build();
 
-    public static final PropertyDescriptor MIN_AGE = new 
PropertyDescriptor.Builder()
+    public static final PropertyDescriptor MIN_AGE = new Builder()
             .name("Minimum File Age")
             .description("The minimum age that a file must be in order to be 
pulled; any file younger than this amount of time (according to last 
modification date) will be ignored")
             .required(true)
-            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .addValidator(TIME_PERIOD_VALIDATOR)
             .defaultValue("0 sec")
             .build();
 
-    public static final PropertyDescriptor MAX_AGE = new 
PropertyDescriptor.Builder()
+    public static final PropertyDescriptor MAX_AGE = new Builder()
             .name("Maximum File Age")
             .description("The maximum age that a file must be in order to be 
pulled; any file older than this amount of time (according to last modification 
date) will be ignored")
             .required(false)
             .addValidator(StandardValidators.createTimePeriodValidator(100, 
TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
             .build();
 
-    public static final PropertyDescriptor MIN_SIZE = new 
PropertyDescriptor.Builder()
+    public static final PropertyDescriptor MIN_SIZE = new Builder()
             .name("Minimum File Size")
             .description("The minimum size that a file must be in order to be 
pulled")
             .required(true)
@@ -186,14 +201,14 @@ public class ListFile extends 
AbstractListProcessor<FileInfo> {
             .defaultValue("0 B")
             .build();
 
-    public static final PropertyDescriptor MAX_SIZE = new 
PropertyDescriptor.Builder()
+    public static final PropertyDescriptor MAX_SIZE = new Builder()
             .name("Maximum File Size")
             .description("The maximum size that a file can be in order to be 
pulled")
             .required(false)
             .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
             .build();
 
-    public static final PropertyDescriptor IGNORE_HIDDEN_FILES = new 
PropertyDescriptor.Builder()
+    public static final PropertyDescriptor IGNORE_HIDDEN_FILES = new Builder()
             .name("Ignore Hidden Files")
             .description("Indicates whether or not hidden files should be 
ignored")
             .allowableValues("true", "false")
@@ -201,11 +216,63 @@ public class ListFile extends 
AbstractListProcessor<FileInfo> {
             .required(true)
             .build();
 
+    public static final PropertyDescriptor TRACK_PERFORMANCE = new Builder()
+        .name("track-performance")
+        .displayName("Track Performance")
+        .description("Whether or not the Processor should track the 
performance of disk access operations. If true, all accesses to disk will be 
recorded, including the file being accessed, the " +
+            "information being obtained, and how long it takes. This is then 
logged periodically at a DEBUG level. While the amount of data will be capped, 
" +
+            "this option may still consume a significant amount of heap 
(controlled by the 'Maximum Number of Files to Track' property), " +
+            "but it can be very useful for troubleshooting purposes if 
performance is poor is degraded.")
+        .required(true)
+        .allowableValues("true", "false")
+        .defaultValue("false")
+        .build();
+
+    public static final PropertyDescriptor MAX_TRACKED_FILES = new Builder()
+        .name("max-performance-metrics")
+        .displayName("Maximum Number of Files to Track")
+        .description("If the 'Track Performance' property is set to 'true', 
this property indicates the maximum number of files whose performance metrics 
should be held onto. A smaller value for " +
+            "this property will result in less heap utilization, while a 
larger value may provide more accurate insights into how the disk access 
operations are performing")
+        .required(true)
+        .addValidator(POSITIVE_INTEGER_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+        .defaultValue("100000")
+        .build();
+
+    public static final PropertyDescriptor MAX_DISK_OPERATION_TIME = new 
Builder()
+        .name("max-operation-time")
+        .displayName("Max Disk Operation Time")
+        .description("The maximum amount of time that any single disk 
operation is expected to take. If any disk operation takes longer than this 
amount of time, a warning bulletin will be " +
+            "generated for each operation that exceeds this amount of time.")
+        .required(false)
+        .addValidator(TIME_PERIOD_VALIDATOR)
+        .expressionLanguageSupported(VARIABLE_REGISTRY)
+        .defaultValue("10 secs")
+        .build();
+
+    public static final PropertyDescriptor MAX_LISTING_TIME = new Builder()
+        .name("max-listing-time")
+        .displayName("Max Directory Listing Time")
+        .description("The maximum amount of time that listing any single 
directory is expected to take. If the listing for the directory specified by 
the 'Input Directory' property, " +
+            "or the listing of any subdirectory (if 'Recurse' is set to true) 
takes longer than this amount of time, a warning bulletin will be generated for 
each directory listing " +
+            "that exceeds this amount of time.")
+        .required(false)
+        .addValidator(TIME_PERIOD_VALIDATOR)
+        .expressionLanguageSupported(VARIABLE_REGISTRY)
+        .defaultValue("3 mins")
+        .build();
+
+
     private List<PropertyDescriptor> properties;
     private Set<Relationship> relationships;
 
+    private volatile ScheduledExecutorService monitoringThreadPool;
+    private volatile Future<?> monitoringFuture;
+
     private volatile boolean includeFileAttributes;
-    private final AtomicReference<BiPredicate<Path, BasicFileAttributes>> 
fileFilterRef = new AtomicReference<BiPredicate<Path, BasicFileAttributes>>();
+    private volatile PerformanceTracker performanceTracker;
+    private volatile long performanceLoggingTimestamp = 
System.currentTimeMillis();
+    private final AtomicReference<BiPredicate<Path, BasicFileAttributes>> 
fileFilterRef = new AtomicReference<>();
 
     public static final String FILE_CREATION_TIME_ATTRIBUTE = 
"file.creationTime";
     public static final String FILE_LAST_MODIFY_TIME_ATTRIBUTE = 
"file.lastModifiedTime";
@@ -219,8 +286,8 @@ public class ListFile extends 
AbstractListProcessor<FileInfo> {
     @Override
     protected void init(final ProcessorInitializationContext context) {
         final List<PropertyDescriptor> properties = new ArrayList<>();
-        properties.add(LISTING_STRATEGY);
         properties.add(DIRECTORY);
+        properties.add(LISTING_STRATEGY);
         properties.add(RECURSE);
         properties.add(DIRECTORY_LOCATION);
         properties.add(FILE_FILTER);
@@ -236,11 +303,23 @@ public class ListFile extends 
AbstractListProcessor<FileInfo> {
         properties.add(ListedEntityTracker.TRACKING_TIME_WINDOW);
         properties.add(ListedEntityTracker.INITIAL_LISTING_TARGET);
         properties.add(ListedEntityTracker.NODE_IDENTIFIER);
+        properties.add(TRACK_PERFORMANCE);
+        properties.add(MAX_TRACKED_FILES);
+        properties.add(MAX_DISK_OPERATION_TIME);
+        properties.add(MAX_LISTING_TIME);
         this.properties = Collections.unmodifiableList(properties);
 
         final Set<Relationship> relationships = new HashSet<>();
         relationships.add(REL_SUCCESS);
         this.relationships = Collections.unmodifiableSet(relationships);
+
+        monitoringThreadPool = Executors.newScheduledThreadPool(1, r -> {
+            final Thread t = Executors.defaultThreadFactory().newThread(r);
+            t.setName("Monitor ListFile Performance [UUID=" + 
context.getIdentifier() + "]");
+            t.setDaemon(true);
+
+            return t;
+        });
     }
 
     @Override
@@ -257,8 +336,76 @@ public class ListFile extends 
AbstractListProcessor<FileInfo> {
     public void onScheduled(final ProcessContext context) {
         fileFilterRef.set(createFileFilter(context));
         includeFileAttributes = 
context.getProperty(INCLUDE_FILE_ATTRIBUTES).asBoolean();
+
+        final long maxDiskOperationMillis = 
context.getProperty(MAX_DISK_OPERATION_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
+        final long maxListingMillis = 
context.getProperty(MAX_LISTING_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
+
+        final boolean trackPerformance = 
context.getProperty(TRACK_PERFORMANCE).asBoolean();
+        if (trackPerformance) {
+            final int maxEntries = 
context.getProperty(MAX_TRACKED_FILES).evaluateAttributeExpressions().asInteger();
+            performanceTracker = new 
RollingMetricPerformanceTracker(getLogger(), maxDiskOperationMillis, 
maxEntries);
+        } else {
+            performanceTracker = new UntrackedPerformanceTracker(getLogger(), 
maxDiskOperationMillis);
+        }
+
+        final long millisToKeepStats = TimeUnit.MINUTES.toMillis(15);
+        final MonitorActiveTasks monitorTask = new 
MonitorActiveTasks(performanceTracker, getLogger(), maxDiskOperationMillis, 
maxListingMillis, millisToKeepStats);
+        monitoringFuture = 
monitoringThreadPool.scheduleAtFixedRate(monitorTask, 15, 15, TimeUnit.SECONDS);
+    }
+
+    @OnStopped
+    public void onStopped(final ProcessContext context) {
+        if (monitoringFuture != null) {
+            monitoringFuture.cancel(true);
+        }
+
+        final boolean trackPerformance = 
context.getProperty(TRACK_PERFORMANCE).asBoolean();
+        if (trackPerformance) {
+            logPerformance();
+        }
+    }
+
+    protected PerformanceTracker getPerformanceTracker() {
+        return performanceTracker;
     }
 
+    public void logPerformance() {
+        final ComponentLog logger = getLogger();
+        if (!logger.isDebugEnabled()) {
+            return;
+        }
+
+        final long earliestTimestamp = 
performanceTracker.getEarliestTimestamp();
+        final long millis = System.currentTimeMillis() - earliestTimestamp;
+        final long seconds = TimeUnit.MILLISECONDS.toSeconds(millis);
+
+        for (final DiskOperation operation : DiskOperation.values()) {
+            final OperationStatistics stats = 
performanceTracker.getOperationStatistics(operation);
+
+            final StringBuilder sb = new StringBuilder();
+            if (stats.getCount() == 0) {
+                sb.append("Over the past ").append(seconds).append(" seconds, 
for Operation '").append(operation).append("' there were no operations 
performed");
+            } else {
+                sb.append("Over the past ").append(seconds).append(" seconds, 
For Operation '").append(operation).append("' there were ")
+                    .append(stats.getCount()).append(" operations performed 
with an average time of ")
+                    .append(stats.getAverage()).append(" milliseconds; 
Standard Deviation = ").append(stats.getStandardDeviation()).append(" millis; 
Min Time = ")
+                    .append(stats.getMin()).append(" millis, Max Time = 
").append(stats.getMax()).append(" millis");
+
+                if (logger.isDebugEnabled()) {
+                    final Map<String, Long> outliers = stats.getOutliers();
+
+                    sb.append("; 
").append(stats.getOutliers().size()).append(" significant outliers: ");
+                    sb.append(outliers);
+                }
+            }
+
+            logger.debug(sb.toString());
+        }
+
+        performanceLoggingTimestamp = System.currentTimeMillis();
+    }
+
+
     @Override
     protected Map<String, String> createAttributes(final FileInfo fileInfo, 
final ProcessContext context) {
         final Map<String, String> attributes = new HashMap<>();
@@ -284,34 +431,43 @@ public class ListFile extends 
AbstractListProcessor<FileInfo> {
         attributes.put(FILE_LAST_MODIFY_TIME_ATTRIBUTE, formatter.format(new 
Date(fileInfo.getLastModifiedTime())));
 
         if (includeFileAttributes) {
+            final TimingInfo timingInfo = 
performanceTracker.getTimingInfo(relativePath.toString(), file.getName());
+
             try {
                 FileStore store = Files.getFileStore(filePath);
-                if (store.supportsFileAttributeView("basic")) {
-                    try {
-                        BasicFileAttributeView view = 
Files.getFileAttributeView(filePath, BasicFileAttributeView.class);
-                        BasicFileAttributes attrs = view.readAttributes();
-                        attributes.put(FILE_CREATION_TIME_ATTRIBUTE, 
formatter.format(new Date(attrs.creationTime().toMillis())));
-                        attributes.put(FILE_LAST_ACCESS_TIME_ATTRIBUTE, 
formatter.format(new Date(attrs.lastAccessTime().toMillis())));
-                    } catch (Exception ignore) {
-                    } // allow other attributes if these fail
-                }
 
-                if (store.supportsFileAttributeView("owner")) {
-                    try {
-                        FileOwnerAttributeView view = 
Files.getFileAttributeView(filePath, FileOwnerAttributeView.class);
-                        attributes.put(FILE_OWNER_ATTRIBUTE, 
view.getOwner().getName());
-                    } catch (Exception ignore) {
-                    } // allow other attributes if these fail
-                }
+                
timingInfo.timeOperation(DiskOperation.RETRIEVE_BASIC_ATTRIBUTES, () -> {
+                    if (store.supportsFileAttributeView("basic")) {
+                        try {
+                            BasicFileAttributeView view = 
Files.getFileAttributeView(filePath, BasicFileAttributeView.class);
+                            BasicFileAttributes attrs = view.readAttributes();
+                            attributes.put(FILE_CREATION_TIME_ATTRIBUTE, 
formatter.format(new Date(attrs.creationTime().toMillis())));
+                            attributes.put(FILE_LAST_ACCESS_TIME_ATTRIBUTE, 
formatter.format(new Date(attrs.lastAccessTime().toMillis())));
+                        } catch (Exception ignore) {
+                        } // allow other attributes if these fail
+                    }
+                });
 
-                if (store.supportsFileAttributeView("posix")) {
-                    try {
-                        PosixFileAttributeView view = 
Files.getFileAttributeView(filePath, PosixFileAttributeView.class);
-                        attributes.put(FILE_PERMISSIONS_ATTRIBUTE, 
PosixFilePermissions.toString(view.readAttributes().permissions()));
-                        attributes.put(FILE_GROUP_ATTRIBUTE, 
view.readAttributes().group().getName());
-                    } catch (Exception ignore) {
-                    } // allow other attributes if these fail
-                }
+                
timingInfo.timeOperation(DiskOperation.RETRIEVE_OWNER_ATTRIBUTES, () -> {
+                    if (store.supportsFileAttributeView("owner")) {
+                        try {
+                            FileOwnerAttributeView view = 
Files.getFileAttributeView(filePath, FileOwnerAttributeView.class);
+                            attributes.put(FILE_OWNER_ATTRIBUTE, 
view.getOwner().getName());
+                        } catch (Exception ignore) {
+                        } // allow other attributes if these fail
+                    }
+                });
+
+                
timingInfo.timeOperation(DiskOperation.RETRIEVE_POSIX_ATTRIBUTES, () -> {
+                    if (store.supportsFileAttributeView("posix")) {
+                        try {
+                            PosixFileAttributeView view = 
Files.getFileAttributeView(filePath, PosixFileAttributeView.class);
+                            attributes.put(FILE_PERMISSIONS_ATTRIBUTE, 
PosixFilePermissions.toString(view.readAttributes().permissions()));
+                            attributes.put(FILE_GROUP_ATTRIBUTE, 
view.readAttributes().group().getName());
+                        } catch (Exception ignore) {
+                        } // allow other attributes if these fail
+                    }
+                });
             } catch (IOException ioe) {
                 // well then this FlowFile gets none of these attributes
                 getLogger().warn("Error collecting attributes for file {}, 
message is {}", new Object[] {absPathString, ioe.getMessage()});
@@ -338,36 +494,94 @@ public class ListFile extends 
AbstractListProcessor<FileInfo> {
 
     @Override
     protected List<FileInfo> performListing(final ProcessContext context, 
final Long minTimestamp) throws IOException {
-        final Path path = new File(getPath(context)).toPath();
+        final Path basePath = new File(getPath(context)).toPath();
         final Boolean recurse = context.getProperty(RECURSE).asBoolean();
         final Map<Path, BasicFileAttributes> lastModifiedMap = new HashMap<>();
 
         final BiPredicate<Path, BasicFileAttributes> fileFilter = 
fileFilterRef.get();
         int maxDepth = recurse ? Integer.MAX_VALUE : 1;
-        BiPredicate<Path, BasicFileAttributes> matcher = (p, attributes) -> {
-            if (!attributes.isDirectory()
-                    && (minTimestamp == null || 
attributes.lastModifiedTime().toMillis() >= minTimestamp)
-                    && fileFilter.test(p, attributes)) {
-                // We store the attributes for each Path we are returning in 
order to avoid to
-                // retrieve them again later when creating the FileInfo
-                lastModifiedMap.put(p, attributes);
-                return true;
-            }
-            return false;
+
+        final BiPredicate<Path, BasicFileAttributes> matcher = new 
BiPredicate<Path, BasicFileAttributes>() {
+            private long lastTimestamp = System.currentTimeMillis();
+
+            @Override
+            public boolean test(final Path path, final BasicFileAttributes 
attributes) {
+                if (!isScheduled()) {
+                    throw new ProcessorStoppedException();
+                }
+
+                final long now = System.currentTimeMillis();
+                final long timeToList = now - lastTimestamp;
+                lastTimestamp = now;
+
+                final Path relativeDirectory = 
basePath.relativize(path).getParent();
+                final String relativePath = relativeDirectory == null ? "" : 
relativeDirectory.toString();
+                final String filename = path.getFileName().toString();
+                
performanceTracker.acceptOperation(DiskOperation.RETRIEVE_NEXT_FILE_FROM_OS, 
relativePath, filename, timeToList);
+
+                final boolean isDirectory = attributes.isDirectory();
+                if (isDirectory) {
+                    performanceTracker.setActiveDirectory(relativePath);
+                }
+
+                final TimedOperationKey operationKey = 
performanceTracker.beginOperation(DiskOperation.FILTER, relativePath, filename);
+
+                try {
+                    if (!isDirectory && (minTimestamp == null || 
attributes.lastModifiedTime().toMillis() >= minTimestamp)
+                        && fileFilter.test(path, attributes)) {
+                        // We store the attributes for each Path we are 
returning in order to avoid to
+                        // retrieve them again later when creating the FileInfo
+                        lastModifiedMap.put(path, attributes);
+
+                        return true;
+                    }
+
+                    return false;
+                } finally {
+                    performanceTracker.completeOperation(operationKey);
+
+                    if 
(TimeUnit.MILLISECONDS.toMinutes(System.currentTimeMillis() - 
performanceLoggingTimestamp) >= 5) {
+                        logPerformance();
+                    }
+                }
+            }
         };
-        Stream<Path> inputStream = Files.find(path, maxDepth, matcher, 
FileVisitOption.FOLLOW_LINKS);
-        Stream<FileInfo> listing = inputStream.map(p -> {
+
+        final Stream<Path> inputStream = getPathStream(basePath, maxDepth, 
matcher);
+
+        final Stream<FileInfo> listing = inputStream.map(p -> {
             File file = p.toFile();
             BasicFileAttributes attributes = lastModifiedMap.get(p);
-            return new FileInfo.Builder()
+
+            final FileInfo fileInfo = new FileInfo.Builder()
                 .directory(false)
                 .filename(file.getName())
                 .fullPathFileName(file.getAbsolutePath())
                 .lastModifiedTime(attributes.lastModifiedTime().toMillis())
                 .size(attributes.size())
                 .build();
+
+            return fileInfo;
         });
-        return listing.collect(Collectors.toList());
+
+        // Perform the actual listing
+        try {
+            final long start = System.currentTimeMillis();
+            final List<FileInfo> fileInfos = 
listing.collect(Collectors.toList());
+            final long millis = System.currentTimeMillis() - start;
+
+            getLogger().debug("Took {} milliseconds to perform listing and 
gather {} entries", new Object[] {millis, fileInfos.size()});
+            return fileInfos;
+        } catch (final ProcessorStoppedException pse) {
+            getLogger().info("Processor was stopped so will not complete 
listing of Files");
+            return Collections.emptyList();
+        } finally {
+            performanceTracker.completeActiveDirectory();
+        }
+    }
+
+    protected Stream<Path> getPathStream(final Path basePath, final int 
maxDepth, final BiPredicate<Path, BasicFileAttributes> matcher) throws 
IOException {
+        return Files.find(basePath, maxDepth, matcher, 
FileVisitOption.FOLLOW_LINKS);
     }
 
     @Override
@@ -389,11 +603,15 @@ public class ListFile extends 
AbstractListProcessor<FileInfo> {
         final long minAge = 
context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
         final Long maxAge = 
context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
         final boolean ignoreHidden = 
context.getProperty(IGNORE_HIDDEN_FILES).asBoolean();
-        final Pattern filePattern = 
Pattern.compile(context.getProperty(FILE_FILTER).getValue());
+        final String fileFilter = context.getProperty(FILE_FILTER).getValue();
+        final Pattern filePattern = Pattern.compile(fileFilter);
         final String indir = 
context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue();
         final boolean recurseDirs = context.getProperty(RECURSE).asBoolean();
         final String pathPatternStr = 
context.getProperty(PATH_FILTER).getValue();
         final Pattern pathPattern = (!recurseDirs || pathPatternStr == null) ? 
null : Pattern.compile(pathPatternStr);
+
+        final Path basePath = Paths.get(indir);
+
         return (path, attributes) -> {
             if (minSize > attributes.size()) {
                 return false;
@@ -408,22 +626,652 @@ public class ListFile extends 
AbstractListProcessor<FileInfo> {
             if (maxAge != null && maxAge < fileAge) {
                 return false;
             }
-            if (ignoreHidden && path.toFile().isHidden()) {
-                return false;
-            }
+
+            final Path relativePath = basePath.relativize(path).getParent();
+            final String relativeDir = relativePath == null ? "" : 
relativePath.toString();
+            final String filename = path.getFileName().toString();
+            final TimingInfo timingInfo = 
performanceTracker.getTimingInfo(relativeDir, filename);
+
+            final File file = path.toFile();
+
             if (pathPattern != null) {
-                Path reldir = Paths.get(indir).relativize(path).getParent();
-                if (reldir != null && !reldir.toString().isEmpty()) {
-                    if (!pathPattern.matcher(reldir.toString()).matches()) {
+                if (relativePath != null && 
!relativePath.toString().isEmpty()) {
+                    if 
(!pathPattern.matcher(relativePath.toString()).matches()) {
                         return false;
                     }
                 }
             }
+
+            final boolean matchesFilter = 
filePattern.matcher(filename).matches();
+            if (!matchesFilter) {
+                return false;
+            }
+
             // Verify that we have at least read permissions on the file we're 
considering grabbing
-            if (!Files.isReadable(path)) {
+            if (!timingInfo.timeOperation(DiskOperation.CHECK_READABLE, () -> 
Files.isReadable(path))) {
+                return false;
+            }
+
+            if (ignoreHidden && 
timingInfo.timeOperation(DiskOperation.CHECK_HIDDEN, file::isHidden)) {
                 return false;
             }
-            return 
filePattern.matcher(path.getFileName().toString()).matches();
+
+            return true;
         };
     }
+
+    /**
+     * A PerformanceTracker that is capable of tracking which disk access 
operation is active and which directory is actively being listed,
+     * as well as timing specific operations, but does not track metrics over 
any amount of time. This implementation does not provide the ability
+     * to glean information such as which operations or files are taking the 
longest to operate on but uses very little heap.
+     */
+    public static class UntrackedPerformanceTracker implements 
PerformanceTracker {
+        private TimedOperationKey activeOperation = null;
+        private String activeDirectory;
+        private long activeDirectoryStartTime = -1L;
+
+        private final ComponentLog logger;
+        private final long maxDiskOperationMillis;
+
+        public UntrackedPerformanceTracker(final ComponentLog logger, final 
long maxDiskOperationMillis) {
+            this.logger = logger;
+            this.maxDiskOperationMillis = maxDiskOperationMillis;
+        }
+
+        @Override
+        public TimedOperationKey beginOperation(final DiskOperation operation, 
final String directory, final String filename) {
+            return null;
+        }
+
+        @Override
+        public void completeOperation(final TimedOperationKey operationKey) {
+        }
+
+        @Override
+        public void acceptOperation(final DiskOperation operation, final 
String directory, final String filename, final long millis) {
+        }
+
+        @Override
+        public TimingInfo getTimingInfo(final String directory, final String 
filename) {
+            return new TimingInfo(directory, filename, this, logger, 
maxDiskOperationMillis);
+        }
+
+        @Override
+        public OperationStatistics getOperationStatistics(final DiskOperation 
operation) {
+            return OperationStatistics.EMPTY;
+        }
+
+        @Override
+        public synchronized void setActiveOperation(final TimedOperationKey 
operationKey) {
+            this.activeOperation = operationKey;
+        }
+
+        @Override
+        public synchronized void completeActiveOperation() {
+            this.activeOperation = null;
+        }
+
+        @Override
+        public synchronized TimedOperationKey getActiveOperation() {
+            return activeOperation;
+        }
+
+        @Override
+        public void purgeTimingInfo(final long cutoff) {
+        }
+
+        @Override
+        public long getEarliestTimestamp() {
+            return System.currentTimeMillis();
+        }
+
+        @Override
+        public synchronized void setActiveDirectory(final String directory) {
+            activeDirectory = directory;
+            activeDirectoryStartTime = System.currentTimeMillis();
+        }
+
+        @Override
+        public synchronized void completeActiveDirectory() {
+            activeDirectory = null;
+            activeDirectoryStartTime = -1L;
+        }
+
+        @Override
+        public synchronized long getActiveDirectoryStartTime() {
+            return activeDirectoryStartTime;
+        }
+
+        @Override
+        public synchronized String getActiveDirectory() {
+            return activeDirectory;
+        }
+
+        @Override
+        public int getTrackedFileCount() {
+            return 0;
+        }
+    }
+
+    /**
+     * Tracks metrics using a rolling window of time, in which older metrics 
are 'aged off' by calling {@link #purgeTimingInfo(long)}. Tracking these 
metrics allows information
+     * to be gleaned, such as which files are expensive to operate on or which 
operations are most expensive. However, the heap utilization is significant.
+     */
+    public static final class RollingMetricPerformanceTracker implements 
PerformanceTracker {
+        private final Map<String, String> directoryCanonicalization = new 
HashMap<>();
+        private final Map<Tuple<String, String>, TimingInfo> 
directoryToTimingInfo;
+        private TimedOperationKey activeOperation;
+        private long earliestTimestamp = System.currentTimeMillis();
+        private final long maxDiskOperationMillis;
+        private final ComponentLog logger;
+
+        private String activeDirectory;
+        private long activeDirectoryStartTime = -1L;
+
+        public RollingMetricPerformanceTracker(final ComponentLog logger, 
final long maxDiskOperationMillis, final int maxEntries) {
+            this.logger = logger;
+            this.maxDiskOperationMillis = maxDiskOperationMillis;
+
+            directoryToTimingInfo = new LinkedHashMap<Tuple<String, String>, 
TimingInfo>() {
+                @Override
+                protected boolean removeEldestEntry(final 
Map.Entry<Tuple<String, String>, TimingInfo> eldest) {
+                    return size() > maxEntries;
+                }
+            };
+        }
+
+        @Override
+        public synchronized TimedOperationKey beginOperation(final 
DiskOperation operation, final String directory, final String filename) {
+            return new TimedOperationKey(operation, directory, filename, 
System.currentTimeMillis());
+        }
+
+        @Override
+        public synchronized void completeOperation(final TimedOperationKey 
operationKey) {
+            final TimingInfo timingInfo = 
getTimingInfo(operationKey.getDirectory(), operationKey.getFilename());
+            timingInfo.accept(operationKey.getOperation(), 
System.currentTimeMillis() - operationKey.getStartTime());
+        }
+
+        @Override
+        public synchronized void acceptOperation(final DiskOperation 
operation, final String directory, final String filename, final long millis) {
+            final String canonicalDirectory = 
directoryCanonicalization.computeIfAbsent(directory, key -> directory);
+            final Tuple<String, String> key = new Tuple<>(canonicalDirectory, 
filename);
+            final TimingInfo timingInfo = 
directoryToTimingInfo.computeIfAbsent(key, k -> new TimingInfo(directory, 
filename, this, logger, maxDiskOperationMillis));
+            timingInfo.accept(operation, millis);
+        }
+
+        @Override
+        public synchronized TimingInfo getTimingInfo(final String directory, 
final String filename) {
+            final String canonicalDirectory = 
directoryCanonicalization.computeIfAbsent(directory, key -> directory);
+            final Tuple<String, String> key = new Tuple<>(canonicalDirectory, 
filename);
+            final TimingInfo timingInfo = 
directoryToTimingInfo.computeIfAbsent(key, k -> new TimingInfo(directory, 
filename, this, logger, maxDiskOperationMillis));
+
+            return timingInfo;
+        }
+
+        @Override
+        public void setActiveOperation(final TimedOperationKey 
activeOperation) {
+            this.activeOperation = activeOperation;
+        }
+
+        @Override
+        public void completeActiveOperation() {
+            this.activeOperation = null;
+        }
+
+        @Override
+        public synchronized TimedOperationKey getActiveOperation() {
+            return activeOperation;
+        }
+
+        @Override
+        public synchronized void setActiveDirectory(final String directory) {
+            activeDirectory = directory;
+            activeDirectoryStartTime = System.currentTimeMillis();
+        }
+
+        @Override
+        public synchronized void completeActiveDirectory() {
+            activeDirectory = null;
+            activeDirectoryStartTime = -1L;
+        }
+
+        @Override
+        public synchronized long getActiveDirectoryStartTime() {
+            return activeDirectoryStartTime;
+        }
+
+        @Override
+        public synchronized String getActiveDirectory() {
+            return activeDirectory;
+        }
+
+        @Override
+        public synchronized int getTrackedFileCount() {
+            return directoryToTimingInfo.size();
+        }
+
+        @Override
+        public synchronized void purgeTimingInfo(final long cutoff) {
+            logger.debug("Purging any entries from Performance Tracker that is 
older than {}", new Object[] {new Date(cutoff)});
+            final Iterator<Map.Entry<Tuple<String, String>, TimingInfo>> itr = 
directoryToTimingInfo.entrySet().iterator();
+
+            int purgedCount = 0;
+            long earliestTimestamp = System.currentTimeMillis();
+            while (itr.hasNext()) {
+                final Map.Entry<Tuple<String, String>, TimingInfo> entry = 
itr.next();
+                final TimingInfo timingInfo = entry.getValue();
+                final long creationTime = timingInfo.getCreationTimestamp();
+
+                if (creationTime < cutoff) {
+                    itr.remove();
+                    purgedCount++;
+
+                    directoryCanonicalization.remove(entry.getKey().getKey());
+                } else {
+                    earliestTimestamp = Math.min(earliestTimestamp, 
creationTime);
+                }
+            }
+
+            this.earliestTimestamp = earliestTimestamp;
+            logger.debug("Purged {} entries from Performance Tracker; now 
holding {} entries", new Object[] {purgedCount, directoryToTimingInfo.size()});
+        }
+
+        public long getEarliestTimestamp() {
+            return earliestTimestamp;
+        }
+
+        public synchronized OperationStatistics getOperationStatistics(final 
DiskOperation operation) {
+            long count = 0L;
+            long sum = 0L;
+            long min = 0L;
+            long max = 0L;
+
+            // Calculate min/max/mean
+            for (final TimingInfo timingInfo : directoryToTimingInfo.values()) 
{
+                final long operationTime = 
timingInfo.getOperationTime(operation);
+
+                if (operationTime < 0) { // operation not conducted
+                    continue;
+                }
+
+                sum += operationTime;
+
+                if (count++ == 0) {
+                    min = operationTime;
+                    max = operationTime;
+                } else {
+                    min = Math.min(min, operationTime);
+                    max = Math.max(max, operationTime);
+                }
+            }
+
+            if (count == 0) {
+                return OperationStatistics.EMPTY;
+            }
+
+            double average = (double) sum / (double) count;
+
+            // Calculate Standard Deviation
+            final double stdDeviation = calculateStdDev(average, (double) 
count, operation);
+            final double outlierCutoff = average + 2 * stdDeviation;
+
+            final Map<String, Long> outliers = new HashMap<>();
+            for (final TimingInfo timingInfo : directoryToTimingInfo.values()) 
{
+                final long operationTime = 
timingInfo.getOperationTime(operation);
+
+                if (operationTime > 2 && operationTime > outlierCutoff) {
+                    final String directory = timingInfo.getDirectory();
+                    final String filename = timingInfo.getFilename();
+                    final String fullPath = directory.endsWith("/") ? 
directory + filename : directory + "/" + filename;
+                    outliers.put(fullPath, operationTime);
+                }
+            }
+
+            return new StandardOperationStatistics(min, max, count, average, 
stdDeviation, outliers);
+        }
+
+        private double calculateStdDev(final double average, final double 
count, final DiskOperation operation) {
+            double squaredDifferenceSum = 0D;
+            for (final TimingInfo timingInfo : directoryToTimingInfo.values()) 
{
+                final long operationTime = 
timingInfo.getOperationTime(operation);
+                if (operationTime < 0) {
+                    continue;
+                }
+
+                final double differenceSquared = Math.pow(((double) 
operationTime - average), 2);
+                squaredDifferenceSum += differenceSquared;
+            }
+
+            final double squaredDifferenceAverage = squaredDifferenceSum / 
count;
+            final double stdDeviation = Math.pow(squaredDifferenceAverage, 
0.5);
+            return stdDeviation;
+        }
+    }
+
+    /**
+     * Provides a mechanism for timing how long a particular operation takes 
to complete, logging if it takes longer than the configured threshold.
+     */
+    private static class TimingInfo {
+        private final String directory;
+        private final String filename;
+        private final int[] operationTimes;
+        private final PerformanceTracker tracker;
+        private final long creationTimestamp;
+        private final ComponentLog logger;
+        private final long maxDiskOperationMillis;
+
+        public TimingInfo(final String directory, final String filename, final 
PerformanceTracker tracker, final ComponentLog logger, final long 
maxDiskOperationMillis) {
+            this.directory = directory;
+            this.filename = filename;
+            this.tracker = tracker;
+            this.logger = logger;
+            this.maxDiskOperationMillis = maxDiskOperationMillis;
+
+            this.creationTimestamp = System.currentTimeMillis();
+
+            operationTimes = new int[DiskOperation.values().length];
+            Arrays.fill(operationTimes, -1);
+        }
+
+        public String getDirectory() {
+            return directory;
+        }
+
+        public String getFilename() {
+            return filename;
+        }
+
+        public void accept(final DiskOperation operation, final long duration) 
{
+            operationTimes[operation.ordinal()] = (int) duration;
+
+            if (duration > maxDiskOperationMillis) {
+                final String fullPath = getFullPath();
+                logger.warn("This Processor completed action {} on {} in {} 
milliseconds, which exceeds the configured threshold of {} milliseconds",
+                    new Object[] {operation, fullPath, duration, 
maxDiskOperationMillis});
+            }
+
+            if (logger.isTraceEnabled()) {
+                logger.trace("Performing operation {} on {} took {} 
milliseconds", new Object[] {operation, getFullPath(), duration});
+            }
+        }
+
+        private String getFullPath() {
+            if (directory.isEmpty()) {
+                return filename;
+            } else {
+                return directory.endsWith("/") ? directory + filename : 
directory + "/" + filename;
+            }
+        }
+
+        public long getOperationTime(final DiskOperation operation) {
+            return operationTimes[operation.ordinal()];
+        }
+
+        private <T> T timeOperation(final DiskOperation operation, final 
Supplier<T> function) {
+            final long start = System.currentTimeMillis();
+            final TimedOperationKey operationKey = new 
TimedOperationKey(operation, directory, filename, start);
+            tracker.setActiveOperation(operationKey);
+
+            try {
+                final T value = function.get();
+                final long millis = System.currentTimeMillis() - start;
+                accept(operation, millis);
+                return value;
+            } finally {
+                tracker.completeActiveOperation();
+            }
+        }
+
+        private void timeOperation(final DiskOperation operation, final 
Runnable task) {
+            final long start = System.currentTimeMillis();
+            final TimedOperationKey operationKey = new 
TimedOperationKey(operation, directory, filename, start);
+            tracker.setActiveOperation(operationKey);
+
+            try {
+                task.run();
+                final long millis = System.currentTimeMillis() - start;
+                accept(operation, millis);
+            } finally {
+                tracker.completeActiveOperation();
+            }
+        }
+
+        public long getCreationTimestamp() {
+            return creationTimestamp;
+        }
+    }
+
+    /**
+     * PerformanceTracker is responsible for providing a mechanism by which 
any disk operation can be timed and the timing information
+     * can both be used to issue warnings as well as be aggregated for some 
amount of time, in order to understand how long certain disk operations
+     * take and which files may be responsible for causing longer-than-usual 
operations to be performed.
+     */
+    interface PerformanceTracker {
+        TimedOperationKey beginOperation(DiskOperation operation, String 
directory, String filename);
+
+        void completeOperation(TimedOperationKey operationKey);
+
+        void acceptOperation(DiskOperation operation, String directory, String 
filename, long millis);
+
+        TimingInfo getTimingInfo(String directory, String filename);
+
+        OperationStatistics getOperationStatistics(DiskOperation operation);
+
+        void setActiveOperation(TimedOperationKey operationKey);
+
+        void completeActiveOperation();
+
+        TimedOperationKey getActiveOperation();
+
+        void purgeTimingInfo(long cutoff);
+
+        long getEarliestTimestamp();
+
+        void setActiveDirectory(String directory);
+
+        void completeActiveDirectory();
+
+        String getActiveDirectory();
+
+        long getActiveDirectoryStartTime();
+
+        int getTrackedFileCount();
+    }
+
+
+    interface OperationStatistics {
+        long getMin();
+        long getMax();
+        long getCount();
+        double getAverage();
+        double getStandardDeviation();
+
+        Map<String, Long> getOutliers();
+
+        OperationStatistics EMPTY = new OperationStatistics() {
+            @Override
+            public long getMin() {
+                return 0;
+            }
+
+            @Override
+            public long getMax() {
+                return 0;
+            }
+
+            @Override
+            public long getCount() {
+                return 0;
+            }
+
+            @Override
+            public double getAverage() {
+                return 0;
+            }
+
+            @Override
+            public double getStandardDeviation() {
+                return 0;
+            }
+
+            @Override
+            public Map<String, Long> getOutliers() {
+                return Collections.emptyMap();
+            }
+        };
+    }
+
+    private static class StandardOperationStatistics implements 
OperationStatistics {
+        private final long min;
+        private final long max;
+        private final long count;
+        private final double average;
+        private final double stdDev;
+        private final Map<String, Long> outliers;
+
+        public StandardOperationStatistics(final long min, final long max, 
final long count, final double average, final double stdDev, final Map<String, 
Long> outliers) {
+            this.min = min;
+            this.max = max;
+            this.count = count;
+            this.average = average;
+            this.stdDev = stdDev;
+            this.outliers = outliers;
+        }
+
+        public long getMin() {
+            return min;
+        }
+
+        public long getMax() {
+            return max;
+        }
+
+        public long getCount() {
+            return count;
+        }
+
+        public double getAverage() {
+            return average;
+        }
+
+        public double getStandardDeviation() {
+            return stdDev;
+        }
+
+        public Map<String, Long> getOutliers() {
+            return outliers;
+        }
+    }
+
+
+    private static class TimedOperationKey {
+        private final DiskOperation operation;
+        private final String directory;
+        private final String filename;
+        private final long startTime;
+
+        public TimedOperationKey(final DiskOperation operation, final String 
directory, final String filename, final long startTime) {
+            this.operation = operation;
+            this.startTime = startTime;
+            this.directory = directory;
+            this.filename = filename;
+        }
+
+        public DiskOperation getOperation() {
+            return operation;
+        }
+
+        public String getDirectory() {
+            return directory;
+        }
+
+        public String getFilename() {
+            return filename;
+        }
+
+        public long getStartTime() {
+            return startTime;
+        }
+    }
+
+    private enum DiskOperation {
+        RETRIEVE_BASIC_ATTRIBUTES,
+        RETRIEVE_OWNER_ATTRIBUTES,
+        RETRIEVE_POSIX_ATTRIBUTES,
+        CHECK_HIDDEN,
+        CHECK_READABLE,
+        FILTER,
+        RETRIEVE_NEXT_FILE_FROM_OS;
+    }
+
+    private static class ProcessorStoppedException extends RuntimeException {
+    }
+
+    static class MonitorActiveTasks implements Runnable {
+        private final PerformanceTracker performanceTracker;
+        private final ComponentLog logger;
+        private final long maxDiskOperationMillis;
+        private final long maxListingMillis;
+        private final long millisToKeepStats;
+        private long lastPurgeTimestamp = 0L;
+
+        public MonitorActiveTasks(final PerformanceTracker tracker, final 
ComponentLog logger, final long maxDiskOperationMillis, final long 
maxListingMillis, final long millisToKeepStats) {
+            this.performanceTracker = tracker;
+            this.logger = logger;
+            this.maxDiskOperationMillis = maxDiskOperationMillis;
+            this.maxListingMillis = maxListingMillis;
+            this.millisToKeepStats = millisToKeepStats;
+        }
+
+        @Override
+        public void run() {
+            monitorActiveOperation();
+            monitorActiveDirectory();
+
+            final long now = System.currentTimeMillis();
+            final long millisSincePurge = now - lastPurgeTimestamp;
+            if (millisSincePurge > TimeUnit.SECONDS.toMillis(60)) {
+                performanceTracker.purgeTimingInfo(now - millisToKeepStats);
+                lastPurgeTimestamp = System.currentTimeMillis();
+            }
+        }
+
+        private void monitorActiveOperation() {
+            final TimedOperationKey activeOperation = 
performanceTracker.getActiveOperation();
+            if (activeOperation == null) {
+                return;
+            }
+
+            final long activeTime = System.currentTimeMillis() - 
activeOperation.getStartTime();
+            if (activeTime > maxDiskOperationMillis) {
+                final String directory = activeOperation.getDirectory();
+                final String filename = activeOperation.getFilename();
+
+                final String fullPath;
+                if (directory.isEmpty()) {
+                    fullPath = filename;
+                } else {
+                    fullPath = directory.endsWith("/") ? directory + filename 
: directory + "/" + filename;
+                }
+
+                logger.warn("This Processor has currently spent {} 
milliseconds performing the {} action on {}, which exceeds the configured 
threshold of {} milliseconds",
+                    new Object[] {activeTime, activeOperation.getOperation(), 
fullPath, maxDiskOperationMillis});
+            }
+        }
+
+        private void monitorActiveDirectory() {
+            final String activeDirectory = 
performanceTracker.getActiveDirectory();
+            final long startTime = 
performanceTracker.getActiveDirectoryStartTime();
+            if (startTime <= 0) {
+                return;
+            }
+
+            final long activeMillis = System.currentTimeMillis() - startTime;
+            if (activeMillis > maxListingMillis) {
+                final String fullPath = activeDirectory.isEmpty() ? "the base 
directory" : activeDirectory;
+                logger.warn("This processor has currently spent {} 
milliseconds performing the listing of {}, which exceeds the configured 
threshold of {} milliseconds",
+                    new Object[] {activeMillis, fullPath, maxListingMillis});
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/72ea93a6/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
index bf2755b..b9276be 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
@@ -17,10 +17,22 @@
 
 package org.apache.nifi.processors.standard;
 
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.list.AbstractListProcessor;
+import org.apache.nifi.processor.util.list.ListProcessorTestWatcher;
+import org.apache.nifi.processors.standard.util.FileInfo;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.Description;
 
 import java.io.File;
 import java.io.FileOutputStream;
@@ -28,6 +40,8 @@ import java.io.IOException;
 import java.nio.file.FileStore;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.nio.file.attribute.FileTime;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
@@ -38,27 +52,15 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BiPredicate;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.state.Scope;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.util.list.AbstractListProcessor;
-import org.apache.nifi.processor.util.list.ListProcessorTestWatcher;
-import org.apache.nifi.processors.standard.util.FileInfo;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.Description;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 public class TestListFile {
 
@@ -160,13 +162,99 @@ public class TestListFile {
         dumpState.dumpState(startedAtMillis);
     }
 
+
     @Test
-    public void testGetRelationships() throws Exception {
-        Set<Relationship> relationships = processor.getRelationships();
-        assertEquals(1, relationships.size());
-        assertEquals(AbstractListProcessor.REL_SUCCESS, 
relationships.toArray()[0]);
+    @Ignore("Intended only for manual testing, as is very expensive to run as 
a unit test. Performs listing of 1,000,000 files (doesn't actually create the 
files, though - injects them in) to " +
+        "ensure performance is not harmed")
+    public void testPerformanceOnLargeListing() {
+        final List<Path> paths = new ArrayList<>(1_000_000);
+        final File base = new File("target");
+
+        for (int firstLevel=0; firstLevel < 1000; firstLevel++) {
+            final File dir = new File(base, String.valueOf(firstLevel));
+
+            for (int secondLevel = 0; secondLevel < 1000; secondLevel++) {
+                final File file = new File(dir, String.valueOf(secondLevel));
+                paths.add(file.toPath());
+            }
+        }
+
+        final BasicFileAttributes basicFileAttributes = new 
BasicFileAttributes() {
+            @Override
+            public FileTime lastModifiedTime() {
+                return FileTime.fromMillis(System.currentTimeMillis());
+            }
+
+            @Override
+            public FileTime lastAccessTime() {
+                return FileTime.fromMillis(System.currentTimeMillis());
+            }
+
+            @Override
+            public FileTime creationTime() {
+                return FileTime.fromMillis(System.currentTimeMillis());
+            }
+
+            @Override
+            public boolean isRegularFile() {
+                return false;
+            }
+
+            @Override
+            public boolean isDirectory() {
+                return false;
+            }
+
+            @Override
+            public boolean isSymbolicLink() {
+                return false;
+            }
+
+            @Override
+            public boolean isOther() {
+                return false;
+            }
+
+            @Override
+            public long size() {
+                return 0;
+            }
+
+            @Override
+            public Object fileKey() {
+                return null;
+            }
+        };
+
+        processor = new ListFile() {
+            @Override
+            protected Stream<Path> getPathStream(final Path basePath, final 
int maxDepth, final BiPredicate<Path, BasicFileAttributes> matcher) throws 
IOException {
+                return paths.stream()
+                    .filter(path -> matcher.test(path, basicFileAttributes));
+            }
+        };
+
+        runner = TestRunners.newTestRunner(processor);
+        
runner.setProperty(AbstractListProcessor.TARGET_SYSTEM_TIMESTAMP_PRECISION, 
AbstractListProcessor.PRECISION_SECONDS.getValue());
+        runner.setProperty(ListFile.TRACK_PERFORMANCE, "true");
+        runner.setProperty(ListFile.MAX_TRACKED_FILES, "100000");
+        runner.setProperty(ListFile.DIRECTORY, "target");
+
+        runner.run();
+
+        final ListFile.PerformanceTracker tracker = 
processor.getPerformanceTracker();
+        assertEquals(100_000, tracker.getTrackedFileCount());
+
+        final ListFile.MonitorActiveTasks monitorActiveTasks = new 
ListFile.MonitorActiveTasks(tracker, runner.getLogger(), 1000, 1000, 1);
+
+        while (tracker.getTrackedFileCount() > 0) {
+            monitorActiveTasks.run();
+        }
+
+        assertEquals(0, tracker.getTrackedFileCount());
     }
 
+
     @Test
     public void testGetPath() {
         runner.setProperty(ListFile.DIRECTORY, "/dir/test1");
@@ -636,14 +724,14 @@ public class TestListFile {
 
     @Test
     public void testIsListingResetNecessary() throws Exception {
-        assertEquals(true, 
processor.isListingResetNecessary(ListFile.DIRECTORY));
-        assertEquals(true, 
processor.isListingResetNecessary(ListFile.RECURSE));
-        assertEquals(true, 
processor.isListingResetNecessary(ListFile.FILE_FILTER));
-        assertEquals(true, 
processor.isListingResetNecessary(ListFile.PATH_FILTER));
-        assertEquals(true, 
processor.isListingResetNecessary(ListFile.MIN_AGE));
-        assertEquals(true, 
processor.isListingResetNecessary(ListFile.MAX_AGE));
-        assertEquals(true, 
processor.isListingResetNecessary(ListFile.MIN_SIZE));
-        assertEquals(true, 
processor.isListingResetNecessary(ListFile.MAX_SIZE));
+        assertTrue(processor.isListingResetNecessary(ListFile.DIRECTORY));
+        assertTrue(processor.isListingResetNecessary(ListFile.RECURSE));
+        assertTrue(processor.isListingResetNecessary(ListFile.FILE_FILTER));
+        assertTrue(processor.isListingResetNecessary(ListFile.PATH_FILTER));
+        assertTrue(processor.isListingResetNecessary(ListFile.MIN_AGE));
+        assertTrue(processor.isListingResetNecessary(ListFile.MAX_AGE));
+        assertTrue(processor.isListingResetNecessary(ListFile.MIN_SIZE));
+        assertTrue(processor.isListingResetNecessary(ListFile.MAX_SIZE));
         assertEquals(true, 
processor.isListingResetNecessary(ListFile.IGNORE_HIDDEN_FILES));
         assertEquals(false, processor.isListingResetNecessary(new 
PropertyDescriptor.Builder().name("x").build()));
     }

Reply via email to