Repository: nifi Updated Branches: refs/heads/master 4acc9ad28 -> 7fbc23639
NIFI-2956 - GetHDFS - fixed directly path evaluation Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7fbc2363 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7fbc2363 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7fbc2363 Branch: refs/heads/master Commit: 7fbc23639af654fcee72e8ef74340b9f093658d6 Parents: 4acc9ad Author: Pierre Villard <[email protected]> Authored: Thu Oct 27 16:37:29 2016 +0200 Committer: Oleg Zhurakousky <[email protected]> Committed: Wed Nov 2 12:15:26 2016 -0400 ---------------------------------------------------------------------- .../apache/nifi/processors/hadoop/GetHDFS.java | 46 ++++++++------------ 1 file changed, 19 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/7fbc2363/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java index 7ab7ebe..24de0c4 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java @@ -415,7 +415,7 @@ public class GetHDFS extends AbstractHadoopProcessor { try { final FileSystem hdfs = getFileSystem(); // get listing - listing = selectFiles(hdfs, processorConfig.getConfiguredRootDirPath(), null); + listing = selectFiles(hdfs, new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue()), null); lastPollTime.set(System.currentTimeMillis()); } finally { listingLock.unlock(); @@ -460,7 +460,7 @@ public class GetHDFS extends AbstractHadoopProcessor { if (file.isDirectory() && processorConfig.getRecurseSubdirs()) { files.addAll(selectFiles(hdfs, canonicalFile, filesVisited)); - } else if (!file.isDirectory() && processorConfig.getPathFilter().accept(canonicalFile)) { + } else if (!file.isDirectory() && processorConfig.getPathFilter(dir).accept(canonicalFile)) { final long fileAge = System.currentTimeMillis() - file.getModificationTime(); if (processorConfig.getMinimumAge() < fileAge && fileAge < processorConfig.getMaximumAge()) { files.add(canonicalFile); @@ -480,17 +480,14 @@ public class GetHDFS extends AbstractHadoopProcessor { */ protected static class ProcessorConfiguration { - final private Path configuredRootDirPath; final private Pattern fileFilterPattern; final private boolean ignoreDottedFiles; final private boolean filterMatchBasenameOnly; final private long minimumAge; final private long maximumAge; final private boolean recurseSubdirs; - final private PathFilter pathFilter; ProcessorConfiguration(final ProcessContext context) { - configuredRootDirPath = new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue()); ignoreDottedFiles = context.getProperty(IGNORE_DOTTED_FILES).asBoolean(); final String fileFilterRegex = context.getProperty(FILE_FILTER_REGEX).getValue(); fileFilterPattern = (fileFilterRegex == null) ? null : Pattern.compile(fileFilterRegex); @@ -500,7 +497,22 @@ public class GetHDFS extends AbstractHadoopProcessor { final Long maxAgeProp = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS); maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp; recurseSubdirs = context.getProperty(RECURSE_SUBDIRS).asBoolean(); - pathFilter = new PathFilter() { + } + + protected long getMinimumAge() { + return minimumAge; + } + + protected long getMaximumAge() { + return maximumAge; + } + + public boolean getRecurseSubdirs() { + return recurseSubdirs; + } + + protected PathFilter getPathFilter(final Path dir) { + return new PathFilter() { @Override public boolean accept(Path path) { @@ -512,7 +524,7 @@ public class GetHDFS extends AbstractHadoopProcessor { pathToCompare = path.getName(); } else { // figure out portion of path that does not include the provided root dir. - String relativePath = getPathDifference(configuredRootDirPath, path); + String relativePath = getPathDifference(dir, path); if (relativePath.length() == 0) { pathToCompare = path.getName(); } else { @@ -528,25 +540,5 @@ public class GetHDFS extends AbstractHadoopProcessor { }; } - - public Path getConfiguredRootDirPath() { - return configuredRootDirPath; - } - - protected long getMinimumAge() { - return minimumAge; - } - - protected long getMaximumAge() { - return maximumAge; - } - - public boolean getRecurseSubdirs() { - return recurseSubdirs; - } - - protected PathFilter getPathFilter() { - return pathFilter; - } } }
