Repository: nifi
Updated Branches:
  refs/heads/master 4bfb905f3 -> 61c799d88


NIFI-3204 This closes #1561. fix handling deleting a path with a wildcard when 
the processor is invoqued via an incoming flowfile
applied Joseph Witts patch from NIFI-3204 JIRA

Signed-off-by: joewitt <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/61c799d8
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/61c799d8
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/61c799d8

Branch: refs/heads/master
Commit: 61c799d88b53c72cd38fad820284ecd115a6cf1b
Parents: 4bfb905
Author: Francois Prunier <[email protected]>
Authored: Fri Mar 3 11:41:42 2017 +0100
Committer: joewitt <[email protected]>
Committed: Mon Apr 10 08:51:10 2017 -0400

----------------------------------------------------------------------
 .../nifi/processors/hadoop/DeleteHDFS.java      | 59 +++++++++-----------
 .../nifi/processors/hadoop/TestDeleteHDFS.java  | 55 +++++++++---------
 2 files changed, 56 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/61c799d8/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
index ed4d10d..cdabc80 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
@@ -17,7 +17,6 @@
 package org.apache.nifi.processors.hadoop;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -39,33 +38,38 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import org.apache.nifi.annotation.documentation.SeeAlso;
 
 @TriggerWhenEmpty
 @InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
-@Tags({ "hadoop", "HDFS", "delete", "remove", "filesystem", "restricted" })
-@CapabilityDescription("Deletes a file from HDFS. The file can be provided as 
an attribute from an incoming FlowFile, "
-        + "or a statically set file that is periodically removed. If this 
processor has an incoming connection, it"
+@Tags({"hadoop", "HDFS", "delete", "remove", "filesystem", "restricted"})
+@CapabilityDescription("Deletes one or more files or directories from HDFS. 
The path can be provided as an attribute from an incoming FlowFile, "
+        + "or a statically set path that is periodically removed. If this 
processor has an incoming connection, it"
         + "will ignore running on a periodic basis and instead rely on 
incoming FlowFiles to trigger a delete. "
-        + "Optionally, you may specify use a wildcard character to match 
multiple files or directories.")
+        + "Note that you may use a wildcard character to match multiple files 
or directories. If there are"
+        + " no incoming connections no flowfiles will be transfered to any 
output relationships.  If there is an incoming"
+        + " flowfile then provided there are no detected failures it will be 
transferred to success otherwise it will be sent to false. If"
+        + " knowledge of globbed files deleted is necessary use ListHDFS first 
to produce a specific list of files to delete. ")
 @Restricted("Provides operator the ability to delete any file that NiFi has 
access to in HDFS or the local filesystem.")
+@SeeAlso({ListHDFS.class})
 public class DeleteHDFS extends AbstractHadoopProcessor {
+
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
-            .description("FlowFiles will be routed here if the delete command 
was successful")
+            .description("When an incoming flowfile is used then if there are 
no errors invoking delete the flowfile will route here.")
             .build();
 
     public static final Relationship REL_FAILURE = new Relationship.Builder()
             .name("failure")
-            .description("FlowFiles will be routed here if the delete command 
was unsuccessful")
+            .description("When an incoming flowfile is used and there is a 
failure while deleting then the flowfile will route here.")
             .build();
 
     public static final PropertyDescriptor FILE_OR_DIRECTORY = new 
PropertyDescriptor.Builder()
             .name("file_or_directory")
-            .displayName("File or Directory")
+            .displayName("Path")
             .description("The HDFS file or directory to delete. A wildcard 
expression may be used to only delete certain files")
             .required(true)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@@ -109,20 +113,20 @@ public class DeleteHDFS extends AbstractHadoopProcessor {
 
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
-        String fileOrDirectoryName = null;
-        FlowFile flowFile = session.get();
+        final FlowFile originalFlowFile = session.get();
 
         // If this processor has an incoming connection, then do not run 
unless a
         // FlowFile is actually sent through
-        if (flowFile == null && context.hasIncomingConnection()) {
+        if (originalFlowFile == null && context.hasIncomingConnection()) {
             context.yield();
             return;
         }
 
-        if (flowFile != null) {
-            fileOrDirectoryName = 
context.getProperty(FILE_OR_DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
-        } else {
+        final String fileOrDirectoryName;
+        if (originalFlowFile == null) {
             fileOrDirectoryName = 
context.getProperty(FILE_OR_DIRECTORY).evaluateAttributeExpressions().getValue();
+        } else {
+            fileOrDirectoryName = 
context.getProperty(FILE_OR_DIRECTORY).evaluateAttributeExpressions(originalFlowFile).getValue();
         }
 
         final FileSystem fileSystem = getFileSystem();
@@ -140,30 +144,21 @@ public class DeleteHDFS extends AbstractHadoopProcessor {
                 pathList.add(new Path(fileOrDirectoryName));
             }
 
-            Map<String, String> attributes = 
Maps.newHashMapWithExpectedSize(2);
             for (Path path : pathList) {
-                attributes.put("filename", path.getName());
-                attributes.put("path", path.getParent().toString());
                 if (fileSystem.exists(path)) {
                     fileSystem.delete(path, 
context.getProperty(RECURSIVE).asBoolean());
-                    if (!context.hasIncomingConnection()) {
-                        flowFile = session.create();
-                    }
-                    session.transfer(session.putAllAttributes(flowFile, 
attributes), REL_SUCCESS);
-                } else {
-                    getLogger().warn("File (" + path + ") does not exist");
-                    if (!context.hasIncomingConnection()) {
-                        flowFile = session.create();
-                    }
-                    session.transfer(session.putAllAttributes(flowFile, 
attributes), REL_FAILURE);
+                    getLogger().debug("For flowfile {} Deleted file at path {} 
with name {}", new Object[]{originalFlowFile, path.getParent().toString(), 
path.getName()});
                 }
             }
+            if (originalFlowFile != null) {
+                session.transfer(originalFlowFile, DeleteHDFS.REL_SUCCESS);
+            }
         } catch (IOException e) {
-            getLogger().warn("Error processing delete for file or directory", 
e);
-            if (flowFile != null) {
-                session.rollback(true);
+            if (originalFlowFile != null) {
+                getLogger().error("Error processing delete for flowfile {} due 
to {}", new Object[]{originalFlowFile, e.getMessage()}, e);
+                session.transfer(originalFlowFile, DeleteHDFS.REL_FAILURE);
             }
         }
-    }
 
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/61c799d8/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java
index 0cb371c..be16ac6 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java
@@ -16,7 +16,6 @@
  */
 package org.apache.nifi.processors.hadoop;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.any;
@@ -25,15 +24,12 @@ import static org.mockito.Mockito.when;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.hadoop.KerberosProperties;
-import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -55,6 +51,7 @@ public class TestDeleteHDFS {
         mockFileSystem = mock(FileSystem.class);
     }
 
+    //Tests the case where a file is found and deleted but there was no 
incoming connection
     @Test
     public void testSuccessfulDelete() throws Exception {
         Path filePath = new Path("/some/path/to/file.txt");
@@ -66,11 +63,8 @@ public class TestDeleteHDFS {
         runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, filePath.toString());
         runner.assertValid();
         runner.run();
-        runner.assertAllFlowFilesTransferred(DeleteHDFS.REL_SUCCESS);
-        runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 1);
-        FlowFile flowFile = 
runner.getFlowFilesForRelationship(DeleteHDFS.REL_SUCCESS).get(0);
-        assertEquals(filePath.getName(), flowFile.getAttribute("filename"));
-        assertEquals(filePath.getParent().toString(), 
flowFile.getAttribute("path"));
+        runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 0);
+        runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 0);
     }
 
     @Test
@@ -86,9 +80,6 @@ public class TestDeleteHDFS {
         runner.run();
         runner.assertAllFlowFilesTransferred(DeleteHDFS.REL_SUCCESS);
         runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 1);
-        FlowFile flowFile = 
runner.getFlowFilesForRelationship(DeleteHDFS.REL_SUCCESS).get(0);
-        assertEquals(filePath.getName(), flowFile.getAttribute("filename"));
-        assertEquals(filePath.getParent().toString(), 
flowFile.getAttribute("path"));
     }
 
     @Test
@@ -102,9 +93,7 @@ public class TestDeleteHDFS {
         attributes.put("hdfs.file", filePath.toString());
         runner.enqueue("foo", attributes);
         runner.run();
-        runner.assertQueueNotEmpty();
-        runner.assertPenalizeCount(1);
-        assertEquals(1, runner.getQueueSize().getObjectCount());
+        runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 1);
     }
 
     @Test
@@ -131,11 +120,7 @@ public class TestDeleteHDFS {
         runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, filePath.toString());
         runner.assertValid();
         runner.run();
-        runner.assertAllFlowFilesTransferred(DeleteHDFS.REL_FAILURE);
-        runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 1);
-        FlowFile flowFile = 
runner.getFlowFilesForRelationship(DeleteHDFS.REL_FAILURE).get(0);
-        assertEquals(filePath.getName(), flowFile.getAttribute("filename"));
-        assertEquals(filePath.getParent().toString(), 
flowFile.getAttribute("path"));
+        runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 0);
     }
 
     @Test
@@ -158,14 +143,32 @@ public class TestDeleteHDFS {
         runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, glob.toString());
         runner.assertValid();
         runner.run();
-        runner.assertAllFlowFilesTransferred(DeleteHDFS.REL_SUCCESS);
-        runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, fileCount);
-        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(DeleteHDFS.REL_SUCCESS);
+        runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 0);
+    }
+
+    @Test
+    public void testGlobDeleteFromIncomingFlowFile() throws Exception {
+        Path glob = new Path("/data/for/2017/08/05/*");
+        int fileCount = 300;
+        FileStatus[] fileStatuses = new FileStatus[fileCount];
         for (int i = 0; i < fileCount; i++) {
-            FlowFile flowFile = flowFiles.get(i);
-            assertEquals("file" + i, flowFile.getAttribute("filename"));
-            assertEquals("/data/for/2017/08/05", 
flowFile.getAttribute("path"));
+            Path file = new Path("/data/for/2017/08/05/file" + i);
+            FileStatus fileStatus = mock(FileStatus.class);
+            when(fileStatus.getPath()).thenReturn(file);
+            fileStatuses[i] = fileStatus;
         }
+        when(mockFileSystem.exists(any(Path.class))).thenReturn(true);
+        
when(mockFileSystem.globStatus(any(Path.class))).thenReturn(fileStatuses);
+        DeleteHDFS deleteHDFS = new TestableDeleteHDFS(kerberosProperties, 
mockFileSystem);
+        TestRunner runner = TestRunners.newTestRunner(deleteHDFS);
+        runner.setIncomingConnection(true);
+        Map<String, String> attributes = Maps.newHashMap();
+        runner.enqueue("foo", attributes);
+        runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, glob.toString());
+        runner.assertValid();
+        runner.run();
+        runner.assertAllFlowFilesTransferred(DeleteHDFS.REL_SUCCESS);
+        runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 1);
     }
 
     private static class TestableDeleteHDFS extends DeleteHDFS {

Reply via email to