Repository: nifi Updated Branches: refs/heads/master 8b5342dec -> 7843b885e
NIFI-4144 - added min/max age to ListHDFS processor This closes #1966. Signed-off-by: Tony Kurc <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7843b885 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7843b885 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7843b885 Branch: refs/heads/master Commit: 7843b885ee061880d9ef7a4f3e1a39ad8ca40d00 Parents: 8b5342d Author: Pierre Villard <[email protected]> Authored: Fri Jun 30 18:55:37 2017 +0200 Committer: Tony Kurc <[email protected]> Committed: Tue Jul 4 21:18:43 2017 -0400 ---------------------------------------------------------------------- .../apache/nifi/processors/hadoop/ListHDFS.java | 58 ++++++++++++++++- .../nifi/processors/hadoop/TestListHDFS.java | 68 ++++++++++++++++++++ 2 files changed, 123 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/7843b885/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java index a705ee8..14d057d 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java @@ -33,6 +33,8 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateMap; import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; @@ -48,6 +50,7 @@ import org.apache.nifi.processor.util.StandardValidators; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -58,7 +61,6 @@ import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; - @TriggerSerially @TriggerWhenEmpty @InputRequirement(Requirement.INPUT_FORBIDDEN) @@ -114,6 +116,24 @@ public class ListHDFS extends AbstractHadoopProcessor { .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) .build(); + public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder() + .name("minimum-file-age") + .displayName("Minimum File Age") + .description("The minimum age that a file must be in order to be pulled; any file younger than this " + + "amount of time (based on last modification date) will be ignored") + .required(false) + .addValidator(StandardValidators.createTimePeriodValidator(0, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS)) + .build(); + + public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder() + .name("maximum-file-age") + .displayName("Maximum File Age") + .description("The maximum age that a file must be in order to be pulled; any file older than this " + + "amount of time (based on last modification date) will be ignored. Minimum value is 100ms.") + .required(false) + .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS)) + .build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("All FlowFiles are transferred to this relationship") @@ -144,6 +164,8 @@ public class ListHDFS extends AbstractHadoopProcessor { props.add(DIRECTORY); props.add(RECURSE_SUBDIRS); props.add(FILE_FILTER); + props.add(MIN_AGE); + props.add(MAX_AGE); return props; } @@ -154,6 +176,23 @@ public class ListHDFS extends AbstractHadoopProcessor { return relationships; } + @Override + protected Collection<ValidationResult> customValidate(ValidationContext context) { + final List<ValidationResult> problems = new ArrayList<>(super.customValidate(context)); + + final Long minAgeProp = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS); + final Long maxAgeProp = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS); + final long minimumAge = (minAgeProp == null) ? 0L : minAgeProp; + final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp; + + if (minimumAge > maximumAge) { + problems.add(new ValidationResult.Builder().valid(false).subject("GetHDFS Configuration") + .explanation(MIN_AGE.getName() + " cannot be greater than " + MAX_AGE.getName()).build()); + } + + return problems; + } + protected String getKey(final String directory) { return getIdentifier() + ".lastListingTime." + directory; } @@ -171,18 +210,31 @@ public class ListHDFS extends AbstractHadoopProcessor { * Determines which of the given FileStatus's describes a File that should be listed. * * @param statuses the eligible FileStatus objects that we could potentially list + * @param context processor context with properties values * @return a Set containing only those FileStatus objects that we want to list */ - Set<FileStatus> determineListable(final Set<FileStatus> statuses) { + Set<FileStatus> determineListable(final Set<FileStatus> statuses, ProcessContext context) { final long minTimestamp = this.latestTimestampListed; final TreeMap<Long, List<FileStatus>> orderedEntries = new TreeMap<>(); + final Long minAgeProp = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS); + // NIFI-4144 - setting to MIN_VALUE so that in case the file modification time is in + // the future relative to the nifi instance, files are not skipped. + final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : minAgeProp; + final Long maxAgeProp = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS); + final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp; + // Build a sorted map to determine the latest possible entries for (final FileStatus status : statuses) { if (status.getPath().getName().endsWith("_COPYING_")) { continue; } + final long fileAge = System.currentTimeMillis() - status.getModificationTime(); + if (minimumAge > fileAge || fileAge > maximumAge) { + continue; + } + final long entityTimestamp = status.getModificationTime(); if (entityTimestamp > latestTimestampListed) { @@ -293,7 +345,7 @@ public class ListHDFS extends AbstractHadoopProcessor { return; } - final Set<FileStatus> listable = determineListable(statuses); + final Set<FileStatus> listable = determineListable(statuses, context); getLogger().debug("Of the {} files found in HDFS, {} are listable", new Object[] {statuses.size(), listable.size()}); for (final FileStatus status : listable) { http://git-wip-us.apache.org/repos/asf/nifi/blob/7843b885/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java index f0fce5a..f176a5f 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java @@ -44,6 +44,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; import java.util.ArrayList; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -313,6 +314,73 @@ public class TestListHDFS { runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 5); } + @Test + public void testMinAgeMaxAge() throws IOException, InterruptedException { + long now = new Date().getTime(); + long oneHourAgo = now - 3600000; + long twoHoursAgo = now - 2*3600000; + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, now, now, create777(), "owner", "group", new Path("/test/willBeIgnored.txt"))); + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, now-5, now-5, create777(), "owner", "group", new Path("/test/testFile.txt"))); + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, oneHourAgo, oneHourAgo, create777(), "owner", "group", new Path("/test/testFile1.txt"))); + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, twoHoursAgo, twoHoursAgo, create777(), "owner", "group", new Path("/test/testFile2.txt"))); + + // all files + runner.run(); + runner.assertValid(); + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 3); + runner.clearTransferState(); + runner.getStateManager().clear(Scope.CLUSTER); + + // invalid min_age > max_age + runner.setProperty(ListHDFS.MIN_AGE, "30 sec"); + runner.setProperty(ListHDFS.MAX_AGE, "1 sec"); + runner.assertNotValid(); + + // only one file (one hour ago) + runner.setProperty(ListHDFS.MIN_AGE, "30 sec"); + runner.setProperty(ListHDFS.MAX_AGE, "90 min"); + runner.assertValid(); + Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS)); + runner.run(); // will ignore the file for this cycle + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0); + + Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS)); + runner.run(); + + // Next iteration should pick up the file, since nothing else was added. + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1); + runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0).assertAttributeEquals("filename", "testFile1.txt"); + runner.clearTransferState(); + runner.getStateManager().clear(Scope.CLUSTER); + + // two files (one hour ago and two hours ago) + runner.setProperty(ListHDFS.MIN_AGE, "30 sec"); + runner.removeProperty(ListHDFS.MAX_AGE); + runner.assertValid(); + + Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS)); + runner.run(); + + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1); + + Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS)); + runner.run(); + + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2); + runner.clearTransferState(); + runner.getStateManager().clear(Scope.CLUSTER); + + // two files (now and one hour ago) + runner.setProperty(ListHDFS.MIN_AGE, "0 sec"); + runner.setProperty(ListHDFS.MAX_AGE, "90 min"); + runner.assertValid(); + + Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS)); + runner.run(); + + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2); + } + private FsPermission create777() { return new FsPermission((short) 0777);
