Repository: nifi
Updated Branches:
  refs/heads/master 6279fd418 -> 2d58497c2


NIFI-2859 - This closes #1383. Ignore files starting with a dot in ListHDFS


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/2d58497c
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/2d58497c
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/2d58497c

Branch: refs/heads/master
Commit: 2d58497c2e888e7a28bc28b4f7a091c46cf70c31
Parents: 6279fd4
Author: Pierre Villard <[email protected]>
Authored: Tue Jan 3 16:22:11 2017 +0100
Committer: joewitt <[email protected]>
Committed: Tue Feb 14 23:21:44 2017 -0500

----------------------------------------------------------------------
 .../apache/nifi/processors/hadoop/ListHDFS.java | 42 ++++++++++++--------
 .../nifi/processors/hadoop/TestListHDFS.java    | 17 ++++++++
 2 files changed, 43 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/2d58497c/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
index 2ae65b2..38a16e4 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processors.hadoop;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -42,11 +43,7 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processors.hadoop.util.HDFSListing;
-import org.codehaus.jackson.JsonNode;
-import org.codehaus.jackson.JsonParseException;
-import org.codehaus.jackson.map.JsonMappingException;
-import org.codehaus.jackson.map.ObjectMapper;
+import org.apache.nifi.processor.util.StandardValidators;
 
 import java.io.File;
 import java.io.IOException;
@@ -59,6 +56,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
 
 
 @TriggerSerially
@@ -105,6 +103,13 @@ public class ListHDFS extends AbstractHadoopProcessor {
         .defaultValue("true")
         .build();
 
+    public static final PropertyDescriptor FILE_FILTER = new 
PropertyDescriptor.Builder()
+        .name("File Filter")
+        .description("Only files whose names match the given regular 
expression will be picked up")
+        .required(true)
+        .defaultValue("[^\\.].*")
+        .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+        .build();
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
         .name("success")
@@ -135,6 +140,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
         props.add(DISTRIBUTED_CACHE_SERVICE);
         props.add(DIRECTORY);
         props.add(RECURSE_SUBDIRS);
+        props.add(FILE_FILTER);
         return props;
     }
 
@@ -152,18 +158,12 @@ public class ListHDFS extends AbstractHadoopProcessor {
     @Override
     public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
         super.onPropertyModified(descriptor, oldValue, newValue);
-        if (isConfigurationRestored() && descriptor.equals(DIRECTORY)) {
+        if (isConfigurationRestored() && (descriptor.equals(DIRECTORY) || 
descriptor.equals(FILE_FILTER))) {
             latestTimestampEmitted = -1L;
             latestTimestampListed = -1L;
         }
     }
 
-    private HDFSListing deserialize(final String serializedState) throws 
JsonParseException, JsonMappingException, IOException {
-        final ObjectMapper mapper = new ObjectMapper();
-        final JsonNode jsonNode = mapper.readTree(serializedState);
-        return mapper.readValue(jsonNode, HDFSListing.class);
-    }
-
     /**
      * Determines which of the given FileStatus's describes a File that should 
be listed.
      *
@@ -283,7 +283,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
         final Set<FileStatus> statuses;
         try {
             final Path rootPath = new Path(directory);
-            statuses = getStatuses(rootPath, recursive, hdfs);
+            statuses = getStatuses(rootPath, recursive, hdfs, 
createPathFilter(context));
             getLogger().debug("Found a total of {} files in HDFS", new 
Object[] {statuses.size()});
         } catch (final IOException | IllegalArgumentException e) {
             getLogger().error("Failed to perform listing of HDFS due to {}", 
new Object[] {e});
@@ -326,17 +326,17 @@ public class ListHDFS extends AbstractHadoopProcessor {
         }
     }
 
-    private Set<FileStatus> getStatuses(final Path path, final boolean 
recursive, final FileSystem hdfs) throws IOException {
+    private Set<FileStatus> getStatuses(final Path path, final boolean 
recursive, final FileSystem hdfs, final PathFilter filter) throws IOException {
         final Set<FileStatus> statusSet = new HashSet<>();
 
         getLogger().debug("Fetching listing for {}", new Object[] {path});
-        final FileStatus[] statuses = hdfs.listStatus(path);
+        final FileStatus[] statuses = hdfs.listStatus(path, filter);
 
         for ( final FileStatus status : statuses ) {
             if ( status.isDirectory() ) {
                 if ( recursive ) {
                     try {
-                        statusSet.addAll(getStatuses(status.getPath(), 
recursive, hdfs));
+                        statusSet.addAll(getStatuses(status.getPath(), 
recursive, hdfs, filter));
                     } catch (final IOException ioe) {
                         getLogger().error("Failed to retrieve HDFS listing for 
subdirectory {} due to {}; will continue listing others", new Object[] 
{status.getPath(), ioe});
                     }
@@ -394,4 +394,14 @@ public class ListHDFS extends AbstractHadoopProcessor {
 
         return sb.toString();
     }
+
+    private PathFilter createPathFilter(final ProcessContext context) {
+        final Pattern filePattern = 
Pattern.compile(context.getProperty(FILE_FILTER).getValue());
+        return new PathFilter() {
+            @Override
+            public boolean accept(Path path) {
+                return filePattern.matcher(path.getName()).matches();
+            }
+        };
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/2d58497c/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
index 8f8699e..bdb058e 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
@@ -102,6 +102,23 @@ public class TestListHDFS {
     }
 
     @Test
+    public void testListingWithFilter() throws InterruptedException {
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testFile.txt")));
+
+        runner.setProperty(ListHDFS.DIRECTORY, 
"${literal('/test'):substring(0,5)}");
+        runner.setProperty(ListHDFS.FILE_FILTER, "[^test].*");
+
+        // first iteration will not pick up files because it has to instead 
check timestamps.
+        // We must then wait long enough to ensure that the listing can be 
performed safely and
+        // run the Processor again.
+        runner.run();
+        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
+    }
+
+    @Test
     public void testListingWithInvalidELFunction() throws InterruptedException 
{
         runner.setProperty(ListHDFS.DIRECTORY, "${literal('/test'):foo()}");
         runner.assertNotValid();

Reply via email to