Repository: nifi
Updated Branches:
  refs/heads/master 8b5342dec -> 7843b885e


NIFI-4144 - added min/max age to ListHDFS processor

This closes #1966.

Signed-off-by: Tony Kurc <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7843b885
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7843b885
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7843b885

Branch: refs/heads/master
Commit: 7843b885ee061880d9ef7a4f3e1a39ad8ca40d00
Parents: 8b5342d
Author: Pierre Villard <[email protected]>
Authored: Fri Jun 30 18:55:37 2017 +0200
Committer: Tony Kurc <[email protected]>
Committed: Tue Jul 4 21:18:43 2017 -0400

----------------------------------------------------------------------
 .../apache/nifi/processors/hadoop/ListHDFS.java | 58 ++++++++++++++++-
 .../nifi/processors/hadoop/TestListHDFS.java    | 68 ++++++++++++++++++++
 2 files changed, 123 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/7843b885/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
index a705ee8..14d057d 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
@@ -33,6 +33,8 @@ import 
org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
@@ -48,6 +50,7 @@ import org.apache.nifi.processor.util.StandardValidators;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -58,7 +61,6 @@ import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
-
 @TriggerSerially
 @TriggerWhenEmpty
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
@@ -114,6 +116,24 @@ public class ListHDFS extends AbstractHadoopProcessor {
         .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
         .build();
 
+    public static final PropertyDescriptor MIN_AGE = new 
PropertyDescriptor.Builder()
+        .name("minimum-file-age")
+        .displayName("Minimum File Age")
+        .description("The minimum age that a file must be in order to be 
pulled; any file younger than this "
+                + "amount of time (based on last modification date) will be 
ignored")
+        .required(false)
+        .addValidator(StandardValidators.createTimePeriodValidator(0, 
TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
+        .build();
+
+    public static final PropertyDescriptor MAX_AGE = new 
PropertyDescriptor.Builder()
+        .name("maximum-file-age")
+        .displayName("Maximum File Age")
+        .description("The maximum age that a file must be in order to be 
pulled; any file older than this "
+                + "amount of time (based on last modification date) will be 
ignored. Minimum value is 100ms.")
+        .required(false)
+        .addValidator(StandardValidators.createTimePeriodValidator(100, 
TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
+        .build();
+
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
         .name("success")
         .description("All FlowFiles are transferred to this relationship")
@@ -144,6 +164,8 @@ public class ListHDFS extends AbstractHadoopProcessor {
         props.add(DIRECTORY);
         props.add(RECURSE_SUBDIRS);
         props.add(FILE_FILTER);
+        props.add(MIN_AGE);
+        props.add(MAX_AGE);
         return props;
     }
 
@@ -154,6 +176,23 @@ public class ListHDFS extends AbstractHadoopProcessor {
         return relationships;
     }
 
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
context) {
+        final List<ValidationResult> problems = new 
ArrayList<>(super.customValidate(context));
+
+        final Long minAgeProp = 
context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+        final Long maxAgeProp = 
context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+        final long minimumAge = (minAgeProp == null) ? 0L : minAgeProp;
+        final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : 
maxAgeProp;
+
+        if (minimumAge > maximumAge) {
+            problems.add(new 
ValidationResult.Builder().valid(false).subject("GetHDFS Configuration")
+                    .explanation(MIN_AGE.getName() + " cannot be greater than 
" + MAX_AGE.getName()).build());
+        }
+
+        return problems;
+    }
+
     protected String getKey(final String directory) {
         return getIdentifier() + ".lastListingTime." + directory;
     }
@@ -171,18 +210,31 @@ public class ListHDFS extends AbstractHadoopProcessor {
      * Determines which of the given FileStatus's describes a File that should 
be listed.
      *
      * @param statuses the eligible FileStatus objects that we could 
potentially list
+     * @param context processor context with properties values
      * @return a Set containing only those FileStatus objects that we want to 
list
      */
-    Set<FileStatus> determineListable(final Set<FileStatus> statuses) {
+    Set<FileStatus> determineListable(final Set<FileStatus> statuses, 
ProcessContext context) {
         final long minTimestamp = this.latestTimestampListed;
         final TreeMap<Long, List<FileStatus>> orderedEntries = new TreeMap<>();
 
+        final Long minAgeProp = 
context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+        // NIFI-4144 - setting to MIN_VALUE so that in case the file 
modification time is in
+        // the future relative to the nifi instance, files are not skipped.
+        final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : 
minAgeProp;
+        final Long maxAgeProp = 
context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+        final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : 
maxAgeProp;
+
         // Build a sorted map to determine the latest possible entries
         for (final FileStatus status : statuses) {
             if (status.getPath().getName().endsWith("_COPYING_")) {
                 continue;
             }
 
+            final long fileAge = System.currentTimeMillis() - 
status.getModificationTime();
+            if (minimumAge > fileAge || fileAge > maximumAge) {
+                continue;
+            }
+
             final long entityTimestamp = status.getModificationTime();
 
             if (entityTimestamp > latestTimestampListed) {
@@ -293,7 +345,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
             return;
         }
 
-        final Set<FileStatus> listable = determineListable(statuses);
+        final Set<FileStatus> listable = determineListable(statuses, context);
         getLogger().debug("Of the {} files found in HDFS, {} are listable", 
new Object[] {statuses.size(), listable.size()});
 
         for (final FileStatus status : listable) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/7843b885/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
index f0fce5a..f176a5f 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
@@ -44,6 +44,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -313,6 +314,73 @@ public class TestListHDFS {
         runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 5);
     }
 
+    @Test
+    public void testMinAgeMaxAge() throws IOException, InterruptedException {
+        long now = new Date().getTime();
+        long oneHourAgo = now - 3600000;
+        long twoHoursAgo = now - 2*3600000;
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, now, now, create777(), "owner", "group", new 
Path("/test/willBeIgnored.txt")));
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, now-5, now-5, create777(), "owner", "group", new 
Path("/test/testFile.txt")));
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, oneHourAgo, oneHourAgo, create777(), "owner", "group", new 
Path("/test/testFile1.txt")));
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, twoHoursAgo, twoHoursAgo, create777(), "owner", "group", new 
Path("/test/testFile2.txt")));
+
+        // all files
+        runner.run();
+        runner.assertValid();
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 3);
+        runner.clearTransferState();
+        runner.getStateManager().clear(Scope.CLUSTER);
+
+        // invalid min_age > max_age
+        runner.setProperty(ListHDFS.MIN_AGE, "30 sec");
+        runner.setProperty(ListHDFS.MAX_AGE, "1 sec");
+        runner.assertNotValid();
+
+        // only one file (one hour ago)
+        runner.setProperty(ListHDFS.MIN_AGE, "30 sec");
+        runner.setProperty(ListHDFS.MAX_AGE, "90 min");
+        runner.assertValid();
+        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
+        runner.run(); // will ignore the file for this cycle
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
+
+        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
+        runner.run();
+
+        // Next iteration should pick up the file, since nothing else was 
added.
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
+        
runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0).assertAttributeEquals("filename",
 "testFile1.txt");
+        runner.clearTransferState();
+        runner.getStateManager().clear(Scope.CLUSTER);
+
+        // two files (one hour ago and two hours ago)
+        runner.setProperty(ListHDFS.MIN_AGE, "30 sec");
+        runner.removeProperty(ListHDFS.MAX_AGE);
+        runner.assertValid();
+
+        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
+
+        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
+        runner.clearTransferState();
+        runner.getStateManager().clear(Scope.CLUSTER);
+
+        // two files (now and one hour ago)
+        runner.setProperty(ListHDFS.MIN_AGE, "0 sec");
+        runner.setProperty(ListHDFS.MAX_AGE, "90 min");
+        runner.assertValid();
+
+        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
+    }
+
 
     private FsPermission create777() {
         return new FsPermission((short) 0777);

Reply via email to