Repository: nifi Updated Branches: refs/heads/master 8ebb4d197 -> 72ea93a65
NIFI-5868: Added instrumentation around ListFile such that all disk accesses are timed and any unusually long listing times or disk access operations can be logged. Additionally, information is logged at a debug level including significant amounts of troubleshooting information when configured to do so This closes #3202. Signed-off-by: Bryan Bende <bbe...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/72ea93a6 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/72ea93a6 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/72ea93a6 Branch: refs/heads/master Commit: 72ea93a657cd673bfc40ef74e172f5efb950e95d Parents: 8ebb4d1 Author: Mark Payne <marka...@hotmail.com> Authored: Wed Dec 5 09:37:20 2018 -0500 Committer: Bryan Bende <bbe...@apache.org> Committed: Wed Dec 5 13:01:14 2018 -0500 ---------------------------------------------------------------------- .../nifi/processors/standard/ListFile.java | 974 +++++++++++++++++-- .../nifi/processors/standard/TestListFile.java | 154 ++- 2 files changed, 1032 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/72ea93a6/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java index 8b73b97..42e2d3f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java @@ -27,12 +27,15 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyDescriptor.Builder; import org.apache.nifi.components.state.Scope; import org.apache.nifi.context.PropertyContext; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessorInitializationContext; @@ -41,6 +44,7 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.list.AbstractListProcessor; import org.apache.nifi.processor.util.list.ListedEntityTracker; import org.apache.nifi.processors.standard.util.FileInfo; +import org.apache.nifi.util.Tuple; import java.io.File; import java.io.IOException; @@ -57,21 +61,32 @@ import java.nio.file.attribute.PosixFilePermissions; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiPredicate; +import java.util.function.Supplier; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY; +import static org.apache.nifi.processor.util.StandardValidators.POSITIVE_INTEGER_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.TIME_PERIOD_VALIDATOR; + @TriggerSerially @InputRequirement(Requirement.INPUT_FORBIDDEN) @Tags({"file", "get", "list", "ingest", "source", "filesystem"}) @@ -115,15 +130,15 @@ public class ListFile extends AbstractListProcessor<FileInfo> { static final AllowableValue LOCATION_REMOTE = new AllowableValue("Remote", "Remote", "Input Directory is located on a remote system. State will be stored across the cluster so that " + "the listing can be performed on Primary Node Only and another node can pick up where the last node left off, if the Primary Node changes"); - public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() + public static final PropertyDescriptor DIRECTORY = new Builder() .name("Input Directory") .description("The input directory from which files to pull files") .required(true) .addValidator(StandardValidators.createDirectoryExistsValidator(true, false)) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .expressionLanguageSupported(VARIABLE_REGISTRY) .build(); - public static final PropertyDescriptor RECURSE = new PropertyDescriptor.Builder() + public static final PropertyDescriptor RECURSE = new Builder() .name("Recurse Subdirectories") .description("Indicates whether to list files from subdirectories of the directory") .required(true) @@ -131,7 +146,7 @@ public class ListFile extends AbstractListProcessor<FileInfo> { .defaultValue("true") .build(); - public static final PropertyDescriptor DIRECTORY_LOCATION = new PropertyDescriptor.Builder() + public static final PropertyDescriptor DIRECTORY_LOCATION = new Builder() .name("Input Directory Location") .description("Specifies where the Input Directory is located. This is used to determine whether state should be stored locally or across the cluster.") .allowableValues(LOCATION_LOCAL, LOCATION_REMOTE) @@ -139,7 +154,7 @@ public class ListFile extends AbstractListProcessor<FileInfo> { .required(true) .build(); - public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder() + public static final PropertyDescriptor FILE_FILTER = new Builder() .name("File Filter") .description("Only files whose names match the given regular expression will be picked up") .required(true) @@ -147,14 +162,14 @@ public class ListFile extends AbstractListProcessor<FileInfo> { .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) .build(); - public static final PropertyDescriptor PATH_FILTER = new PropertyDescriptor.Builder() + public static final PropertyDescriptor PATH_FILTER = new Builder() .name("Path Filter") .description("When " + RECURSE.getName() + " is true, then only subdirectories whose path matches the given regular expression will be scanned") .required(false) .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) .build(); - public static final PropertyDescriptor INCLUDE_FILE_ATTRIBUTES = new PropertyDescriptor.Builder() + public static final PropertyDescriptor INCLUDE_FILE_ATTRIBUTES = new Builder() .name("Include File Attributes") .description("Whether or not to include information such as the file's Last Modified Time and Owner as FlowFile Attributes. " + "Depending on the File System being used, gathering this information can be expensive and as a result should be disabled. This is especially true of remote file shares.") @@ -163,22 +178,22 @@ public class ListFile extends AbstractListProcessor<FileInfo> { .required(true) .build(); - public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder() + public static final PropertyDescriptor MIN_AGE = new Builder() .name("Minimum File Age") .description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored") .required(true) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .addValidator(TIME_PERIOD_VALIDATOR) .defaultValue("0 sec") .build(); - public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder() + public static final PropertyDescriptor MAX_AGE = new Builder() .name("Maximum File Age") .description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored") .required(false) .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS)) .build(); - public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder() + public static final PropertyDescriptor MIN_SIZE = new Builder() .name("Minimum File Size") .description("The minimum size that a file must be in order to be pulled") .required(true) @@ -186,14 +201,14 @@ public class ListFile extends AbstractListProcessor<FileInfo> { .defaultValue("0 B") .build(); - public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder() + public static final PropertyDescriptor MAX_SIZE = new Builder() .name("Maximum File Size") .description("The maximum size that a file can be in order to be pulled") .required(false) .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) .build(); - public static final PropertyDescriptor IGNORE_HIDDEN_FILES = new PropertyDescriptor.Builder() + public static final PropertyDescriptor IGNORE_HIDDEN_FILES = new Builder() .name("Ignore Hidden Files") .description("Indicates whether or not hidden files should be ignored") .allowableValues("true", "false") @@ -201,11 +216,63 @@ public class ListFile extends AbstractListProcessor<FileInfo> { .required(true) .build(); + public static final PropertyDescriptor TRACK_PERFORMANCE = new Builder() + .name("track-performance") + .displayName("Track Performance") + .description("Whether or not the Processor should track the performance of disk access operations. If true, all accesses to disk will be recorded, including the file being accessed, the " + + "information being obtained, and how long it takes. This is then logged periodically at a DEBUG level. While the amount of data will be capped, " + + "this option may still consume a significant amount of heap (controlled by the 'Maximum Number of Files to Track' property), " + + "but it can be very useful for troubleshooting purposes if performance is poor is degraded.") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .build(); + + public static final PropertyDescriptor MAX_TRACKED_FILES = new Builder() + .name("max-performance-metrics") + .displayName("Maximum Number of Files to Track") + .description("If the 'Track Performance' property is set to 'true', this property indicates the maximum number of files whose performance metrics should be held onto. A smaller value for " + + "this property will result in less heap utilization, while a larger value may provide more accurate insights into how the disk access operations are performing") + .required(true) + .addValidator(POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .defaultValue("100000") + .build(); + + public static final PropertyDescriptor MAX_DISK_OPERATION_TIME = new Builder() + .name("max-operation-time") + .displayName("Max Disk Operation Time") + .description("The maximum amount of time that any single disk operation is expected to take. If any disk operation takes longer than this amount of time, a warning bulletin will be " + + "generated for each operation that exceeds this amount of time.") + .required(false) + .addValidator(TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(VARIABLE_REGISTRY) + .defaultValue("10 secs") + .build(); + + public static final PropertyDescriptor MAX_LISTING_TIME = new Builder() + .name("max-listing-time") + .displayName("Max Directory Listing Time") + .description("The maximum amount of time that listing any single directory is expected to take. If the listing for the directory specified by the 'Input Directory' property, " + + "or the listing of any subdirectory (if 'Recurse' is set to true) takes longer than this amount of time, a warning bulletin will be generated for each directory listing " + + "that exceeds this amount of time.") + .required(false) + .addValidator(TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(VARIABLE_REGISTRY) + .defaultValue("3 mins") + .build(); + + private List<PropertyDescriptor> properties; private Set<Relationship> relationships; + private volatile ScheduledExecutorService monitoringThreadPool; + private volatile Future<?> monitoringFuture; + private volatile boolean includeFileAttributes; - private final AtomicReference<BiPredicate<Path, BasicFileAttributes>> fileFilterRef = new AtomicReference<BiPredicate<Path, BasicFileAttributes>>(); + private volatile PerformanceTracker performanceTracker; + private volatile long performanceLoggingTimestamp = System.currentTimeMillis(); + private final AtomicReference<BiPredicate<Path, BasicFileAttributes>> fileFilterRef = new AtomicReference<>(); public static final String FILE_CREATION_TIME_ATTRIBUTE = "file.creationTime"; public static final String FILE_LAST_MODIFY_TIME_ATTRIBUTE = "file.lastModifiedTime"; @@ -219,8 +286,8 @@ public class ListFile extends AbstractListProcessor<FileInfo> { @Override protected void init(final ProcessorInitializationContext context) { final List<PropertyDescriptor> properties = new ArrayList<>(); - properties.add(LISTING_STRATEGY); properties.add(DIRECTORY); + properties.add(LISTING_STRATEGY); properties.add(RECURSE); properties.add(DIRECTORY_LOCATION); properties.add(FILE_FILTER); @@ -236,11 +303,23 @@ public class ListFile extends AbstractListProcessor<FileInfo> { properties.add(ListedEntityTracker.TRACKING_TIME_WINDOW); properties.add(ListedEntityTracker.INITIAL_LISTING_TARGET); properties.add(ListedEntityTracker.NODE_IDENTIFIER); + properties.add(TRACK_PERFORMANCE); + properties.add(MAX_TRACKED_FILES); + properties.add(MAX_DISK_OPERATION_TIME); + properties.add(MAX_LISTING_TIME); this.properties = Collections.unmodifiableList(properties); final Set<Relationship> relationships = new HashSet<>(); relationships.add(REL_SUCCESS); this.relationships = Collections.unmodifiableSet(relationships); + + monitoringThreadPool = Executors.newScheduledThreadPool(1, r -> { + final Thread t = Executors.defaultThreadFactory().newThread(r); + t.setName("Monitor ListFile Performance [UUID=" + context.getIdentifier() + "]"); + t.setDaemon(true); + + return t; + }); } @Override @@ -257,8 +336,76 @@ public class ListFile extends AbstractListProcessor<FileInfo> { public void onScheduled(final ProcessContext context) { fileFilterRef.set(createFileFilter(context)); includeFileAttributes = context.getProperty(INCLUDE_FILE_ATTRIBUTES).asBoolean(); + + final long maxDiskOperationMillis = context.getProperty(MAX_DISK_OPERATION_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS); + final long maxListingMillis = context.getProperty(MAX_LISTING_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS); + + final boolean trackPerformance = context.getProperty(TRACK_PERFORMANCE).asBoolean(); + if (trackPerformance) { + final int maxEntries = context.getProperty(MAX_TRACKED_FILES).evaluateAttributeExpressions().asInteger(); + performanceTracker = new RollingMetricPerformanceTracker(getLogger(), maxDiskOperationMillis, maxEntries); + } else { + performanceTracker = new UntrackedPerformanceTracker(getLogger(), maxDiskOperationMillis); + } + + final long millisToKeepStats = TimeUnit.MINUTES.toMillis(15); + final MonitorActiveTasks monitorTask = new MonitorActiveTasks(performanceTracker, getLogger(), maxDiskOperationMillis, maxListingMillis, millisToKeepStats); + monitoringFuture = monitoringThreadPool.scheduleAtFixedRate(monitorTask, 15, 15, TimeUnit.SECONDS); + } + + @OnStopped + public void onStopped(final ProcessContext context) { + if (monitoringFuture != null) { + monitoringFuture.cancel(true); + } + + final boolean trackPerformance = context.getProperty(TRACK_PERFORMANCE).asBoolean(); + if (trackPerformance) { + logPerformance(); + } + } + + protected PerformanceTracker getPerformanceTracker() { + return performanceTracker; } + public void logPerformance() { + final ComponentLog logger = getLogger(); + if (!logger.isDebugEnabled()) { + return; + } + + final long earliestTimestamp = performanceTracker.getEarliestTimestamp(); + final long millis = System.currentTimeMillis() - earliestTimestamp; + final long seconds = TimeUnit.MILLISECONDS.toSeconds(millis); + + for (final DiskOperation operation : DiskOperation.values()) { + final OperationStatistics stats = performanceTracker.getOperationStatistics(operation); + + final StringBuilder sb = new StringBuilder(); + if (stats.getCount() == 0) { + sb.append("Over the past ").append(seconds).append(" seconds, for Operation '").append(operation).append("' there were no operations performed"); + } else { + sb.append("Over the past ").append(seconds).append(" seconds, For Operation '").append(operation).append("' there were ") + .append(stats.getCount()).append(" operations performed with an average time of ") + .append(stats.getAverage()).append(" milliseconds; Standard Deviation = ").append(stats.getStandardDeviation()).append(" millis; Min Time = ") + .append(stats.getMin()).append(" millis, Max Time = ").append(stats.getMax()).append(" millis"); + + if (logger.isDebugEnabled()) { + final Map<String, Long> outliers = stats.getOutliers(); + + sb.append("; ").append(stats.getOutliers().size()).append(" significant outliers: "); + sb.append(outliers); + } + } + + logger.debug(sb.toString()); + } + + performanceLoggingTimestamp = System.currentTimeMillis(); + } + + @Override protected Map<String, String> createAttributes(final FileInfo fileInfo, final ProcessContext context) { final Map<String, String> attributes = new HashMap<>(); @@ -284,34 +431,43 @@ public class ListFile extends AbstractListProcessor<FileInfo> { attributes.put(FILE_LAST_MODIFY_TIME_ATTRIBUTE, formatter.format(new Date(fileInfo.getLastModifiedTime()))); if (includeFileAttributes) { + final TimingInfo timingInfo = performanceTracker.getTimingInfo(relativePath.toString(), file.getName()); + try { FileStore store = Files.getFileStore(filePath); - if (store.supportsFileAttributeView("basic")) { - try { - BasicFileAttributeView view = Files.getFileAttributeView(filePath, BasicFileAttributeView.class); - BasicFileAttributes attrs = view.readAttributes(); - attributes.put(FILE_CREATION_TIME_ATTRIBUTE, formatter.format(new Date(attrs.creationTime().toMillis()))); - attributes.put(FILE_LAST_ACCESS_TIME_ATTRIBUTE, formatter.format(new Date(attrs.lastAccessTime().toMillis()))); - } catch (Exception ignore) { - } // allow other attributes if these fail - } - if (store.supportsFileAttributeView("owner")) { - try { - FileOwnerAttributeView view = Files.getFileAttributeView(filePath, FileOwnerAttributeView.class); - attributes.put(FILE_OWNER_ATTRIBUTE, view.getOwner().getName()); - } catch (Exception ignore) { - } // allow other attributes if these fail - } + timingInfo.timeOperation(DiskOperation.RETRIEVE_BASIC_ATTRIBUTES, () -> { + if (store.supportsFileAttributeView("basic")) { + try { + BasicFileAttributeView view = Files.getFileAttributeView(filePath, BasicFileAttributeView.class); + BasicFileAttributes attrs = view.readAttributes(); + attributes.put(FILE_CREATION_TIME_ATTRIBUTE, formatter.format(new Date(attrs.creationTime().toMillis()))); + attributes.put(FILE_LAST_ACCESS_TIME_ATTRIBUTE, formatter.format(new Date(attrs.lastAccessTime().toMillis()))); + } catch (Exception ignore) { + } // allow other attributes if these fail + } + }); - if (store.supportsFileAttributeView("posix")) { - try { - PosixFileAttributeView view = Files.getFileAttributeView(filePath, PosixFileAttributeView.class); - attributes.put(FILE_PERMISSIONS_ATTRIBUTE, PosixFilePermissions.toString(view.readAttributes().permissions())); - attributes.put(FILE_GROUP_ATTRIBUTE, view.readAttributes().group().getName()); - } catch (Exception ignore) { - } // allow other attributes if these fail - } + timingInfo.timeOperation(DiskOperation.RETRIEVE_OWNER_ATTRIBUTES, () -> { + if (store.supportsFileAttributeView("owner")) { + try { + FileOwnerAttributeView view = Files.getFileAttributeView(filePath, FileOwnerAttributeView.class); + attributes.put(FILE_OWNER_ATTRIBUTE, view.getOwner().getName()); + } catch (Exception ignore) { + } // allow other attributes if these fail + } + }); + + timingInfo.timeOperation(DiskOperation.RETRIEVE_POSIX_ATTRIBUTES, () -> { + if (store.supportsFileAttributeView("posix")) { + try { + PosixFileAttributeView view = Files.getFileAttributeView(filePath, PosixFileAttributeView.class); + attributes.put(FILE_PERMISSIONS_ATTRIBUTE, PosixFilePermissions.toString(view.readAttributes().permissions())); + attributes.put(FILE_GROUP_ATTRIBUTE, view.readAttributes().group().getName()); + } catch (Exception ignore) { + } // allow other attributes if these fail + } + }); } catch (IOException ioe) { // well then this FlowFile gets none of these attributes getLogger().warn("Error collecting attributes for file {}, message is {}", new Object[] {absPathString, ioe.getMessage()}); @@ -338,36 +494,94 @@ public class ListFile extends AbstractListProcessor<FileInfo> { @Override protected List<FileInfo> performListing(final ProcessContext context, final Long minTimestamp) throws IOException { - final Path path = new File(getPath(context)).toPath(); + final Path basePath = new File(getPath(context)).toPath(); final Boolean recurse = context.getProperty(RECURSE).asBoolean(); final Map<Path, BasicFileAttributes> lastModifiedMap = new HashMap<>(); final BiPredicate<Path, BasicFileAttributes> fileFilter = fileFilterRef.get(); int maxDepth = recurse ? Integer.MAX_VALUE : 1; - BiPredicate<Path, BasicFileAttributes> matcher = (p, attributes) -> { - if (!attributes.isDirectory() - && (minTimestamp == null || attributes.lastModifiedTime().toMillis() >= minTimestamp) - && fileFilter.test(p, attributes)) { - // We store the attributes for each Path we are returning in order to avoid to - // retrieve them again later when creating the FileInfo - lastModifiedMap.put(p, attributes); - return true; - } - return false; + + final BiPredicate<Path, BasicFileAttributes> matcher = new BiPredicate<Path, BasicFileAttributes>() { + private long lastTimestamp = System.currentTimeMillis(); + + @Override + public boolean test(final Path path, final BasicFileAttributes attributes) { + if (!isScheduled()) { + throw new ProcessorStoppedException(); + } + + final long now = System.currentTimeMillis(); + final long timeToList = now - lastTimestamp; + lastTimestamp = now; + + final Path relativeDirectory = basePath.relativize(path).getParent(); + final String relativePath = relativeDirectory == null ? "" : relativeDirectory.toString(); + final String filename = path.getFileName().toString(); + performanceTracker.acceptOperation(DiskOperation.RETRIEVE_NEXT_FILE_FROM_OS, relativePath, filename, timeToList); + + final boolean isDirectory = attributes.isDirectory(); + if (isDirectory) { + performanceTracker.setActiveDirectory(relativePath); + } + + final TimedOperationKey operationKey = performanceTracker.beginOperation(DiskOperation.FILTER, relativePath, filename); + + try { + if (!isDirectory && (minTimestamp == null || attributes.lastModifiedTime().toMillis() >= minTimestamp) + && fileFilter.test(path, attributes)) { + // We store the attributes for each Path we are returning in order to avoid to + // retrieve them again later when creating the FileInfo + lastModifiedMap.put(path, attributes); + + return true; + } + + return false; + } finally { + performanceTracker.completeOperation(operationKey); + + if (TimeUnit.MILLISECONDS.toMinutes(System.currentTimeMillis() - performanceLoggingTimestamp) >= 5) { + logPerformance(); + } + } + } }; - Stream<Path> inputStream = Files.find(path, maxDepth, matcher, FileVisitOption.FOLLOW_LINKS); - Stream<FileInfo> listing = inputStream.map(p -> { + + final Stream<Path> inputStream = getPathStream(basePath, maxDepth, matcher); + + final Stream<FileInfo> listing = inputStream.map(p -> { File file = p.toFile(); BasicFileAttributes attributes = lastModifiedMap.get(p); - return new FileInfo.Builder() + + final FileInfo fileInfo = new FileInfo.Builder() .directory(false) .filename(file.getName()) .fullPathFileName(file.getAbsolutePath()) .lastModifiedTime(attributes.lastModifiedTime().toMillis()) .size(attributes.size()) .build(); + + return fileInfo; }); - return listing.collect(Collectors.toList()); + + // Perform the actual listing + try { + final long start = System.currentTimeMillis(); + final List<FileInfo> fileInfos = listing.collect(Collectors.toList()); + final long millis = System.currentTimeMillis() - start; + + getLogger().debug("Took {} milliseconds to perform listing and gather {} entries", new Object[] {millis, fileInfos.size()}); + return fileInfos; + } catch (final ProcessorStoppedException pse) { + getLogger().info("Processor was stopped so will not complete listing of Files"); + return Collections.emptyList(); + } finally { + performanceTracker.completeActiveDirectory(); + } + } + + protected Stream<Path> getPathStream(final Path basePath, final int maxDepth, final BiPredicate<Path, BasicFileAttributes> matcher) throws IOException { + return Files.find(basePath, maxDepth, matcher, FileVisitOption.FOLLOW_LINKS); } @Override @@ -389,11 +603,15 @@ public class ListFile extends AbstractListProcessor<FileInfo> { final long minAge = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS); final Long maxAge = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS); final boolean ignoreHidden = context.getProperty(IGNORE_HIDDEN_FILES).asBoolean(); - final Pattern filePattern = Pattern.compile(context.getProperty(FILE_FILTER).getValue()); + final String fileFilter = context.getProperty(FILE_FILTER).getValue(); + final Pattern filePattern = Pattern.compile(fileFilter); final String indir = context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue(); final boolean recurseDirs = context.getProperty(RECURSE).asBoolean(); final String pathPatternStr = context.getProperty(PATH_FILTER).getValue(); final Pattern pathPattern = (!recurseDirs || pathPatternStr == null) ? null : Pattern.compile(pathPatternStr); + + final Path basePath = Paths.get(indir); + return (path, attributes) -> { if (minSize > attributes.size()) { return false; @@ -408,22 +626,652 @@ public class ListFile extends AbstractListProcessor<FileInfo> { if (maxAge != null && maxAge < fileAge) { return false; } - if (ignoreHidden && path.toFile().isHidden()) { - return false; - } + + final Path relativePath = basePath.relativize(path).getParent(); + final String relativeDir = relativePath == null ? "" : relativePath.toString(); + final String filename = path.getFileName().toString(); + final TimingInfo timingInfo = performanceTracker.getTimingInfo(relativeDir, filename); + + final File file = path.toFile(); + if (pathPattern != null) { - Path reldir = Paths.get(indir).relativize(path).getParent(); - if (reldir != null && !reldir.toString().isEmpty()) { - if (!pathPattern.matcher(reldir.toString()).matches()) { + if (relativePath != null && !relativePath.toString().isEmpty()) { + if (!pathPattern.matcher(relativePath.toString()).matches()) { return false; } } } + + final boolean matchesFilter = filePattern.matcher(filename).matches(); + if (!matchesFilter) { + return false; + } + // Verify that we have at least read permissions on the file we're considering grabbing - if (!Files.isReadable(path)) { + if (!timingInfo.timeOperation(DiskOperation.CHECK_READABLE, () -> Files.isReadable(path))) { + return false; + } + + if (ignoreHidden && timingInfo.timeOperation(DiskOperation.CHECK_HIDDEN, file::isHidden)) { return false; } - return filePattern.matcher(path.getFileName().toString()).matches(); + + return true; }; } + + /** + * A PerformanceTracker that is capable of tracking which disk access operation is active and which directory is actively being listed, + * as well as timing specific operations, but does not track metrics over any amount of time. This implementation does not provide the ability + * to glean information such as which operations or files are taking the longest to operate on but uses very little heap. + */ + public static class UntrackedPerformanceTracker implements PerformanceTracker { + private TimedOperationKey activeOperation = null; + private String activeDirectory; + private long activeDirectoryStartTime = -1L; + + private final ComponentLog logger; + private final long maxDiskOperationMillis; + + public UntrackedPerformanceTracker(final ComponentLog logger, final long maxDiskOperationMillis) { + this.logger = logger; + this.maxDiskOperationMillis = maxDiskOperationMillis; + } + + @Override + public TimedOperationKey beginOperation(final DiskOperation operation, final String directory, final String filename) { + return null; + } + + @Override + public void completeOperation(final TimedOperationKey operationKey) { + } + + @Override + public void acceptOperation(final DiskOperation operation, final String directory, final String filename, final long millis) { + } + + @Override + public TimingInfo getTimingInfo(final String directory, final String filename) { + return new TimingInfo(directory, filename, this, logger, maxDiskOperationMillis); + } + + @Override + public OperationStatistics getOperationStatistics(final DiskOperation operation) { + return OperationStatistics.EMPTY; + } + + @Override + public synchronized void setActiveOperation(final TimedOperationKey operationKey) { + this.activeOperation = operationKey; + } + + @Override + public synchronized void completeActiveOperation() { + this.activeOperation = null; + } + + @Override + public synchronized TimedOperationKey getActiveOperation() { + return activeOperation; + } + + @Override + public void purgeTimingInfo(final long cutoff) { + } + + @Override + public long getEarliestTimestamp() { + return System.currentTimeMillis(); + } + + @Override + public synchronized void setActiveDirectory(final String directory) { + activeDirectory = directory; + activeDirectoryStartTime = System.currentTimeMillis(); + } + + @Override + public synchronized void completeActiveDirectory() { + activeDirectory = null; + activeDirectoryStartTime = -1L; + } + + @Override + public synchronized long getActiveDirectoryStartTime() { + return activeDirectoryStartTime; + } + + @Override + public synchronized String getActiveDirectory() { + return activeDirectory; + } + + @Override + public int getTrackedFileCount() { + return 0; + } + } + + /** + * Tracks metrics using a rolling window of time, in which older metrics are 'aged off' by calling {@link #purgeTimingInfo(long)}. Tracking these metrics allows information + * to be gleaned, such as which files are expensive to operate on or which operations are most expensive. However, the heap utilization is significant. + */ + public static final class RollingMetricPerformanceTracker implements PerformanceTracker { + private final Map<String, String> directoryCanonicalization = new HashMap<>(); + private final Map<Tuple<String, String>, TimingInfo> directoryToTimingInfo; + private TimedOperationKey activeOperation; + private long earliestTimestamp = System.currentTimeMillis(); + private final long maxDiskOperationMillis; + private final ComponentLog logger; + + private String activeDirectory; + private long activeDirectoryStartTime = -1L; + + public RollingMetricPerformanceTracker(final ComponentLog logger, final long maxDiskOperationMillis, final int maxEntries) { + this.logger = logger; + this.maxDiskOperationMillis = maxDiskOperationMillis; + + directoryToTimingInfo = new LinkedHashMap<Tuple<String, String>, TimingInfo>() { + @Override + protected boolean removeEldestEntry(final Map.Entry<Tuple<String, String>, TimingInfo> eldest) { + return size() > maxEntries; + } + }; + } + + @Override + public synchronized TimedOperationKey beginOperation(final DiskOperation operation, final String directory, final String filename) { + return new TimedOperationKey(operation, directory, filename, System.currentTimeMillis()); + } + + @Override + public synchronized void completeOperation(final TimedOperationKey operationKey) { + final TimingInfo timingInfo = getTimingInfo(operationKey.getDirectory(), operationKey.getFilename()); + timingInfo.accept(operationKey.getOperation(), System.currentTimeMillis() - operationKey.getStartTime()); + } + + @Override + public synchronized void acceptOperation(final DiskOperation operation, final String directory, final String filename, final long millis) { + final String canonicalDirectory = directoryCanonicalization.computeIfAbsent(directory, key -> directory); + final Tuple<String, String> key = new Tuple<>(canonicalDirectory, filename); + final TimingInfo timingInfo = directoryToTimingInfo.computeIfAbsent(key, k -> new TimingInfo(directory, filename, this, logger, maxDiskOperationMillis)); + timingInfo.accept(operation, millis); + } + + @Override + public synchronized TimingInfo getTimingInfo(final String directory, final String filename) { + final String canonicalDirectory = directoryCanonicalization.computeIfAbsent(directory, key -> directory); + final Tuple<String, String> key = new Tuple<>(canonicalDirectory, filename); + final TimingInfo timingInfo = directoryToTimingInfo.computeIfAbsent(key, k -> new TimingInfo(directory, filename, this, logger, maxDiskOperationMillis)); + + return timingInfo; + } + + @Override + public void setActiveOperation(final TimedOperationKey activeOperation) { + this.activeOperation = activeOperation; + } + + @Override + public void completeActiveOperation() { + this.activeOperation = null; + } + + @Override + public synchronized TimedOperationKey getActiveOperation() { + return activeOperation; + } + + @Override + public synchronized void setActiveDirectory(final String directory) { + activeDirectory = directory; + activeDirectoryStartTime = System.currentTimeMillis(); + } + + @Override + public synchronized void completeActiveDirectory() { + activeDirectory = null; + activeDirectoryStartTime = -1L; + } + + @Override + public synchronized long getActiveDirectoryStartTime() { + return activeDirectoryStartTime; + } + + @Override + public synchronized String getActiveDirectory() { + return activeDirectory; + } + + @Override + public synchronized int getTrackedFileCount() { + return directoryToTimingInfo.size(); + } + + @Override + public synchronized void purgeTimingInfo(final long cutoff) { + logger.debug("Purging any entries from Performance Tracker that is older than {}", new Object[] {new Date(cutoff)}); + final Iterator<Map.Entry<Tuple<String, String>, TimingInfo>> itr = directoryToTimingInfo.entrySet().iterator(); + + int purgedCount = 0; + long earliestTimestamp = System.currentTimeMillis(); + while (itr.hasNext()) { + final Map.Entry<Tuple<String, String>, TimingInfo> entry = itr.next(); + final TimingInfo timingInfo = entry.getValue(); + final long creationTime = timingInfo.getCreationTimestamp(); + + if (creationTime < cutoff) { + itr.remove(); + purgedCount++; + + directoryCanonicalization.remove(entry.getKey().getKey()); + } else { + earliestTimestamp = Math.min(earliestTimestamp, creationTime); + } + } + + this.earliestTimestamp = earliestTimestamp; + logger.debug("Purged {} entries from Performance Tracker; now holding {} entries", new Object[] {purgedCount, directoryToTimingInfo.size()}); + } + + public long getEarliestTimestamp() { + return earliestTimestamp; + } + + public synchronized OperationStatistics getOperationStatistics(final DiskOperation operation) { + long count = 0L; + long sum = 0L; + long min = 0L; + long max = 0L; + + // Calculate min/max/mean + for (final TimingInfo timingInfo : directoryToTimingInfo.values()) { + final long operationTime = timingInfo.getOperationTime(operation); + + if (operationTime < 0) { // operation not conducted + continue; + } + + sum += operationTime; + + if (count++ == 0) { + min = operationTime; + max = operationTime; + } else { + min = Math.min(min, operationTime); + max = Math.max(max, operationTime); + } + } + + if (count == 0) { + return OperationStatistics.EMPTY; + } + + double average = (double) sum / (double) count; + + // Calculate Standard Deviation + final double stdDeviation = calculateStdDev(average, (double) count, operation); + final double outlierCutoff = average + 2 * stdDeviation; + + final Map<String, Long> outliers = new HashMap<>(); + for (final TimingInfo timingInfo : directoryToTimingInfo.values()) { + final long operationTime = timingInfo.getOperationTime(operation); + + if (operationTime > 2 && operationTime > outlierCutoff) { + final String directory = timingInfo.getDirectory(); + final String filename = timingInfo.getFilename(); + final String fullPath = directory.endsWith("/") ? directory + filename : directory + "/" + filename; + outliers.put(fullPath, operationTime); + } + } + + return new StandardOperationStatistics(min, max, count, average, stdDeviation, outliers); + } + + private double calculateStdDev(final double average, final double count, final DiskOperation operation) { + double squaredDifferenceSum = 0D; + for (final TimingInfo timingInfo : directoryToTimingInfo.values()) { + final long operationTime = timingInfo.getOperationTime(operation); + if (operationTime < 0) { + continue; + } + + final double differenceSquared = Math.pow(((double) operationTime - average), 2); + squaredDifferenceSum += differenceSquared; + } + + final double squaredDifferenceAverage = squaredDifferenceSum / count; + final double stdDeviation = Math.pow(squaredDifferenceAverage, 0.5); + return stdDeviation; + } + } + + /** + * Provides a mechanism for timing how long a particular operation takes to complete, logging if it takes longer than the configured threshold. + */ + private static class TimingInfo { + private final String directory; + private final String filename; + private final int[] operationTimes; + private final PerformanceTracker tracker; + private final long creationTimestamp; + private final ComponentLog logger; + private final long maxDiskOperationMillis; + + public TimingInfo(final String directory, final String filename, final PerformanceTracker tracker, final ComponentLog logger, final long maxDiskOperationMillis) { + this.directory = directory; + this.filename = filename; + this.tracker = tracker; + this.logger = logger; + this.maxDiskOperationMillis = maxDiskOperationMillis; + + this.creationTimestamp = System.currentTimeMillis(); + + operationTimes = new int[DiskOperation.values().length]; + Arrays.fill(operationTimes, -1); + } + + public String getDirectory() { + return directory; + } + + public String getFilename() { + return filename; + } + + public void accept(final DiskOperation operation, final long duration) { + operationTimes[operation.ordinal()] = (int) duration; + + if (duration > maxDiskOperationMillis) { + final String fullPath = getFullPath(); + logger.warn("This Processor completed action {} on {} in {} milliseconds, which exceeds the configured threshold of {} milliseconds", + new Object[] {operation, fullPath, duration, maxDiskOperationMillis}); + } + + if (logger.isTraceEnabled()) { + logger.trace("Performing operation {} on {} took {} milliseconds", new Object[] {operation, getFullPath(), duration}); + } + } + + private String getFullPath() { + if (directory.isEmpty()) { + return filename; + } else { + return directory.endsWith("/") ? directory + filename : directory + "/" + filename; + } + } + + public long getOperationTime(final DiskOperation operation) { + return operationTimes[operation.ordinal()]; + } + + private <T> T timeOperation(final DiskOperation operation, final Supplier<T> function) { + final long start = System.currentTimeMillis(); + final TimedOperationKey operationKey = new TimedOperationKey(operation, directory, filename, start); + tracker.setActiveOperation(operationKey); + + try { + final T value = function.get(); + final long millis = System.currentTimeMillis() - start; + accept(operation, millis); + return value; + } finally { + tracker.completeActiveOperation(); + } + } + + private void timeOperation(final DiskOperation operation, final Runnable task) { + final long start = System.currentTimeMillis(); + final TimedOperationKey operationKey = new TimedOperationKey(operation, directory, filename, start); + tracker.setActiveOperation(operationKey); + + try { + task.run(); + final long millis = System.currentTimeMillis() - start; + accept(operation, millis); + } finally { + tracker.completeActiveOperation(); + } + } + + public long getCreationTimestamp() { + return creationTimestamp; + } + } + + /** + * PerformanceTracker is responsible for providing a mechanism by which any disk operation can be timed and the timing information + * can both be used to issue warnings as well as be aggregated for some amount of time, in order to understand how long certain disk operations + * take and which files may be responsible for causing longer-than-usual operations to be performed. + */ + interface PerformanceTracker { + TimedOperationKey beginOperation(DiskOperation operation, String directory, String filename); + + void completeOperation(TimedOperationKey operationKey); + + void acceptOperation(DiskOperation operation, String directory, String filename, long millis); + + TimingInfo getTimingInfo(String directory, String filename); + + OperationStatistics getOperationStatistics(DiskOperation operation); + + void setActiveOperation(TimedOperationKey operationKey); + + void completeActiveOperation(); + + TimedOperationKey getActiveOperation(); + + void purgeTimingInfo(long cutoff); + + long getEarliestTimestamp(); + + void setActiveDirectory(String directory); + + void completeActiveDirectory(); + + String getActiveDirectory(); + + long getActiveDirectoryStartTime(); + + int getTrackedFileCount(); + } + + + interface OperationStatistics { + long getMin(); + long getMax(); + long getCount(); + double getAverage(); + double getStandardDeviation(); + + Map<String, Long> getOutliers(); + + OperationStatistics EMPTY = new OperationStatistics() { + @Override + public long getMin() { + return 0; + } + + @Override + public long getMax() { + return 0; + } + + @Override + public long getCount() { + return 0; + } + + @Override + public double getAverage() { + return 0; + } + + @Override + public double getStandardDeviation() { + return 0; + } + + @Override + public Map<String, Long> getOutliers() { + return Collections.emptyMap(); + } + }; + } + + private static class StandardOperationStatistics implements OperationStatistics { + private final long min; + private final long max; + private final long count; + private final double average; + private final double stdDev; + private final Map<String, Long> outliers; + + public StandardOperationStatistics(final long min, final long max, final long count, final double average, final double stdDev, final Map<String, Long> outliers) { + this.min = min; + this.max = max; + this.count = count; + this.average = average; + this.stdDev = stdDev; + this.outliers = outliers; + } + + public long getMin() { + return min; + } + + public long getMax() { + return max; + } + + public long getCount() { + return count; + } + + public double getAverage() { + return average; + } + + public double getStandardDeviation() { + return stdDev; + } + + public Map<String, Long> getOutliers() { + return outliers; + } + } + + + private static class TimedOperationKey { + private final DiskOperation operation; + private final String directory; + private final String filename; + private final long startTime; + + public TimedOperationKey(final DiskOperation operation, final String directory, final String filename, final long startTime) { + this.operation = operation; + this.startTime = startTime; + this.directory = directory; + this.filename = filename; + } + + public DiskOperation getOperation() { + return operation; + } + + public String getDirectory() { + return directory; + } + + public String getFilename() { + return filename; + } + + public long getStartTime() { + return startTime; + } + } + + private enum DiskOperation { + RETRIEVE_BASIC_ATTRIBUTES, + RETRIEVE_OWNER_ATTRIBUTES, + RETRIEVE_POSIX_ATTRIBUTES, + CHECK_HIDDEN, + CHECK_READABLE, + FILTER, + RETRIEVE_NEXT_FILE_FROM_OS; + } + + private static class ProcessorStoppedException extends RuntimeException { + } + + static class MonitorActiveTasks implements Runnable { + private final PerformanceTracker performanceTracker; + private final ComponentLog logger; + private final long maxDiskOperationMillis; + private final long maxListingMillis; + private final long millisToKeepStats; + private long lastPurgeTimestamp = 0L; + + public MonitorActiveTasks(final PerformanceTracker tracker, final ComponentLog logger, final long maxDiskOperationMillis, final long maxListingMillis, final long millisToKeepStats) { + this.performanceTracker = tracker; + this.logger = logger; + this.maxDiskOperationMillis = maxDiskOperationMillis; + this.maxListingMillis = maxListingMillis; + this.millisToKeepStats = millisToKeepStats; + } + + @Override + public void run() { + monitorActiveOperation(); + monitorActiveDirectory(); + + final long now = System.currentTimeMillis(); + final long millisSincePurge = now - lastPurgeTimestamp; + if (millisSincePurge > TimeUnit.SECONDS.toMillis(60)) { + performanceTracker.purgeTimingInfo(now - millisToKeepStats); + lastPurgeTimestamp = System.currentTimeMillis(); + } + } + + private void monitorActiveOperation() { + final TimedOperationKey activeOperation = performanceTracker.getActiveOperation(); + if (activeOperation == null) { + return; + } + + final long activeTime = System.currentTimeMillis() - activeOperation.getStartTime(); + if (activeTime > maxDiskOperationMillis) { + final String directory = activeOperation.getDirectory(); + final String filename = activeOperation.getFilename(); + + final String fullPath; + if (directory.isEmpty()) { + fullPath = filename; + } else { + fullPath = directory.endsWith("/") ? directory + filename : directory + "/" + filename; + } + + logger.warn("This Processor has currently spent {} milliseconds performing the {} action on {}, which exceeds the configured threshold of {} milliseconds", + new Object[] {activeTime, activeOperation.getOperation(), fullPath, maxDiskOperationMillis}); + } + } + + private void monitorActiveDirectory() { + final String activeDirectory = performanceTracker.getActiveDirectory(); + final long startTime = performanceTracker.getActiveDirectoryStartTime(); + if (startTime <= 0) { + return; + } + + final long activeMillis = System.currentTimeMillis() - startTime; + if (activeMillis > maxListingMillis) { + final String fullPath = activeDirectory.isEmpty() ? "the base directory" : activeDirectory; + logger.warn("This processor has currently spent {} milliseconds performing the listing of {}, which exceeds the configured threshold of {} milliseconds", + new Object[] {activeMillis, fullPath, maxListingMillis}); + } + } + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/72ea93a6/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java index bf2755b..b9276be 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java @@ -17,10 +17,22 @@ package org.apache.nifi.processors.standard; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.list.AbstractListProcessor; +import org.apache.nifi.processor.util.list.ListProcessorTestWatcher; +import org.apache.nifi.processors.standard.util.FileInfo; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.Description; import java.io.File; import java.io.FileOutputStream; @@ -28,6 +40,8 @@ import java.io.IOException; import java.nio.file.FileStore; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.attribute.BasicFileAttributes; +import java.nio.file.attribute.FileTime; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -38,27 +52,15 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.function.BiPredicate; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.state.Scope; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.util.list.AbstractListProcessor; -import org.apache.nifi.processor.util.list.ListProcessorTestWatcher; -import org.apache.nifi.processors.standard.util.FileInfo; -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.Description; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; public class TestListFile { @@ -160,13 +162,99 @@ public class TestListFile { dumpState.dumpState(startedAtMillis); } + @Test - public void testGetRelationships() throws Exception { - Set<Relationship> relationships = processor.getRelationships(); - assertEquals(1, relationships.size()); - assertEquals(AbstractListProcessor.REL_SUCCESS, relationships.toArray()[0]); + @Ignore("Intended only for manual testing, as is very expensive to run as a unit test. Performs listing of 1,000,000 files (doesn't actually create the files, though - injects them in) to " + + "ensure performance is not harmed") + public void testPerformanceOnLargeListing() { + final List<Path> paths = new ArrayList<>(1_000_000); + final File base = new File("target"); + + for (int firstLevel=0; firstLevel < 1000; firstLevel++) { + final File dir = new File(base, String.valueOf(firstLevel)); + + for (int secondLevel = 0; secondLevel < 1000; secondLevel++) { + final File file = new File(dir, String.valueOf(secondLevel)); + paths.add(file.toPath()); + } + } + + final BasicFileAttributes basicFileAttributes = new BasicFileAttributes() { + @Override + public FileTime lastModifiedTime() { + return FileTime.fromMillis(System.currentTimeMillis()); + } + + @Override + public FileTime lastAccessTime() { + return FileTime.fromMillis(System.currentTimeMillis()); + } + + @Override + public FileTime creationTime() { + return FileTime.fromMillis(System.currentTimeMillis()); + } + + @Override + public boolean isRegularFile() { + return false; + } + + @Override + public boolean isDirectory() { + return false; + } + + @Override + public boolean isSymbolicLink() { + return false; + } + + @Override + public boolean isOther() { + return false; + } + + @Override + public long size() { + return 0; + } + + @Override + public Object fileKey() { + return null; + } + }; + + processor = new ListFile() { + @Override + protected Stream<Path> getPathStream(final Path basePath, final int maxDepth, final BiPredicate<Path, BasicFileAttributes> matcher) throws IOException { + return paths.stream() + .filter(path -> matcher.test(path, basicFileAttributes)); + } + }; + + runner = TestRunners.newTestRunner(processor); + runner.setProperty(AbstractListProcessor.TARGET_SYSTEM_TIMESTAMP_PRECISION, AbstractListProcessor.PRECISION_SECONDS.getValue()); + runner.setProperty(ListFile.TRACK_PERFORMANCE, "true"); + runner.setProperty(ListFile.MAX_TRACKED_FILES, "100000"); + runner.setProperty(ListFile.DIRECTORY, "target"); + + runner.run(); + + final ListFile.PerformanceTracker tracker = processor.getPerformanceTracker(); + assertEquals(100_000, tracker.getTrackedFileCount()); + + final ListFile.MonitorActiveTasks monitorActiveTasks = new ListFile.MonitorActiveTasks(tracker, runner.getLogger(), 1000, 1000, 1); + + while (tracker.getTrackedFileCount() > 0) { + monitorActiveTasks.run(); + } + + assertEquals(0, tracker.getTrackedFileCount()); } + @Test public void testGetPath() { runner.setProperty(ListFile.DIRECTORY, "/dir/test1"); @@ -636,14 +724,14 @@ public class TestListFile { @Test public void testIsListingResetNecessary() throws Exception { - assertEquals(true, processor.isListingResetNecessary(ListFile.DIRECTORY)); - assertEquals(true, processor.isListingResetNecessary(ListFile.RECURSE)); - assertEquals(true, processor.isListingResetNecessary(ListFile.FILE_FILTER)); - assertEquals(true, processor.isListingResetNecessary(ListFile.PATH_FILTER)); - assertEquals(true, processor.isListingResetNecessary(ListFile.MIN_AGE)); - assertEquals(true, processor.isListingResetNecessary(ListFile.MAX_AGE)); - assertEquals(true, processor.isListingResetNecessary(ListFile.MIN_SIZE)); - assertEquals(true, processor.isListingResetNecessary(ListFile.MAX_SIZE)); + assertTrue(processor.isListingResetNecessary(ListFile.DIRECTORY)); + assertTrue(processor.isListingResetNecessary(ListFile.RECURSE)); + assertTrue(processor.isListingResetNecessary(ListFile.FILE_FILTER)); + assertTrue(processor.isListingResetNecessary(ListFile.PATH_FILTER)); + assertTrue(processor.isListingResetNecessary(ListFile.MIN_AGE)); + assertTrue(processor.isListingResetNecessary(ListFile.MAX_AGE)); + assertTrue(processor.isListingResetNecessary(ListFile.MIN_SIZE)); + assertTrue(processor.isListingResetNecessary(ListFile.MAX_SIZE)); assertEquals(true, processor.isListingResetNecessary(ListFile.IGNORE_HIDDEN_FILES)); assertEquals(false, processor.isListingResetNecessary(new PropertyDescriptor.Builder().name("x").build())); }