This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 46605253a5 NIFI-14158: Fixed DeleteHDFS processor ignores delete 
action return value
46605253a5 is described below

commit 46605253a52a651c6d86a889bea345a25fd247a9
Author: Mark Bathori <[email protected]>
AuthorDate: Wed Jan 15 14:08:16 2025 +0100

    NIFI-14158: Fixed DeleteHDFS processor ignores delete action return value
    
    This closes #9634.
    
    Signed-off-by: Peter Turcsanyi <[email protected]>
---
 .../apache/nifi/processors/hadoop/DeleteHDFS.java  | 18 +++--
 .../nifi/processors/hadoop/TestDeleteHDFS.java     | 78 +++++++++++++++++++++-
 2 files changed, 90 insertions(+), 6 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
 
b/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
index 5544f2de96..078040b6aa 100644
--- 
a/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
+++ 
b/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
@@ -172,11 +172,19 @@ public class DeleteHDFS extends AbstractHadoopProcessor {
                             attributes.put(getAttributePrefix() + ".path", 
path.getParent().toString());
                             flowFile = session.putAllAttributes(flowFile, 
attributes);
 
-                            fileSystem.delete(path, isRecursive(context, 
session));
-                            getLogger().debug("For flowfile {} Deleted file at 
path {} with name {}", originalFlowFile, path.getParent(), path.getName());
-                            final Path qualifiedPath = 
path.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
-                            flowFile = session.putAttribute(flowFile, 
HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
-                            
session.getProvenanceReporter().invokeRemoteProcess(flowFile, 
qualifiedPath.toString());
+                            boolean success = fileSystem.delete(path, 
isRecursive(context, session));
+
+                            if (success) {
+                                getLogger().debug("For flowfile {} Deleted 
file at path {} with name {}", originalFlowFile, path.getParent(), 
path.getName());
+                                final Path qualifiedPath = 
path.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
+                                flowFile = session.putAttribute(flowFile, 
HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
+                                
session.getProvenanceReporter().invokeRemoteProcess(flowFile, 
qualifiedPath.toString());
+                            } else {
+                                getLogger().warn("Failed to delete file at 
path {} with name {} due to unknown issue, please check related component 
logs.", path.getParent(), path.getName());
+                                attributes.put(getAttributePrefix() + 
".error.message", "Delete action failed due to unknown issue, please check 
related component logs.");
+                                
session.transfer(session.putAllAttributes(session.clone(flowFile), attributes), 
getFailureRelationship());
+                                failedPath++;
+                            }
                         } catch (IOException ioe) {
                             if (handleAuthErrors(ioe, session, context, new 
GSSExceptionRollbackYieldSessionHandler())) {
                                 return null;
diff --git 
a/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java
 
b/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java
index 21391b2287..1ae82fd1f0 100644
--- 
a/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java
+++ 
b/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java
@@ -38,6 +38,7 @@ import static 
org.apache.nifi.processors.hadoop.AbstractHadoopProcessor.HADOOP_F
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -56,6 +57,7 @@ public class TestDeleteHDFS {
         Path filePath = new Path("/some/path/to/file.txt");
         when(mockFileSystem.exists(any(Path.class))).thenReturn(true);
         when(mockFileSystem.getUri()).thenReturn(new 
URI("hdfs://0.example.com:8020"));
+        when(mockFileSystem.delete(any(Path.class), 
anyBoolean())).thenReturn(true);
         DeleteHDFS deleteHDFS = new TestableDeleteHDFS(mockFileSystem);
         TestRunner runner = TestRunners.newTestRunner(deleteHDFS);
         runner.setIncomingConnection(false);
@@ -82,6 +84,7 @@ public class TestDeleteHDFS {
         Path filePath = new Path("/some/path/to/file.txt");
         when(mockFileSystem.exists(any(Path.class))).thenReturn(true);
         when(mockFileSystem.getUri()).thenReturn(new 
URI("hdfs://0.example.com:8020"));
+        when(mockFileSystem.delete(any(Path.class), 
anyBoolean())).thenReturn(true);
         DeleteHDFS deleteHDFS = new TestableDeleteHDFS(mockFileSystem);
         TestRunner runner = TestRunners.newTestRunner(deleteHDFS);
         runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, "${hdfs.file}");
@@ -156,7 +159,7 @@ public class TestDeleteHDFS {
     }
 
     @Test
-    public void testUnsuccessfulDelete() throws Exception {
+    public void testDeleteNotExistingFile() throws Exception {
         Path filePath = new Path("/some/path/to/file.txt");
         when(mockFileSystem.exists(any(Path.class))).thenReturn(false);
         DeleteHDFS deleteHDFS = new TestableDeleteHDFS(mockFileSystem);
@@ -169,6 +172,22 @@ public class TestDeleteHDFS {
         runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 0);
     }
 
+    @Test
+    public void testFailedDelete() throws Exception {
+        final Path filePath = new Path("/some/path/to/file.txt");
+        when(mockFileSystem.exists(any(Path.class))).thenReturn(true);
+        when(mockFileSystem.delete(any(Path.class), 
anyBoolean())).thenReturn(false);
+        final DeleteHDFS deleteHDFS = new TestableDeleteHDFS(mockFileSystem);
+        final TestRunner runner = TestRunners.newTestRunner(deleteHDFS);
+        runner.setIncomingConnection(false);
+        runner.assertNotValid();
+        runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, filePath.toString());
+        runner.assertValid();
+        runner.run();
+        runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 0);
+        runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 1);
+    }
+
     @Test
     public void testGlobDelete() throws Exception {
         Path glob = new Path("/data/for/2017/08/05/*");
@@ -183,6 +202,7 @@ public class TestDeleteHDFS {
         when(mockFileSystem.exists(any(Path.class))).thenReturn(true);
         
when(mockFileSystem.globStatus(any(Path.class))).thenReturn(fileStatuses);
         when(mockFileSystem.getUri()).thenReturn(new 
URI("hdfs://0.example.com:8020"));
+        when(mockFileSystem.delete(any(Path.class), 
anyBoolean())).thenReturn(true);
         DeleteHDFS deleteHDFS = new TestableDeleteHDFS(mockFileSystem);
         TestRunner runner = TestRunners.newTestRunner(deleteHDFS);
         runner.setIncomingConnection(false);
@@ -193,6 +213,61 @@ public class TestDeleteHDFS {
         runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 1);
     }
 
+    @Test
+    public void testFailedGlobDelete() throws Exception {
+        final Path glob = new Path("/data/for/2017/08/05/*");
+        final int fileCount = 10;
+        final FileStatus[] fileStatuses = new FileStatus[fileCount];
+        for (int i = 0; i < fileCount; i++) {
+            final Path file = new Path("/data/for/2017/08/05/file" + i);
+            final FileStatus fileStatus = mock(FileStatus.class);
+            when(fileStatus.getPath()).thenReturn(file);
+            fileStatuses[i] = fileStatus;
+        }
+        when(mockFileSystem.exists(any(Path.class))).thenReturn(true);
+        
when(mockFileSystem.globStatus(any(Path.class))).thenReturn(fileStatuses);
+        when(mockFileSystem.getUri()).thenReturn(new 
URI("hdfs://0.example.com:8020"));
+        when(mockFileSystem.delete(any(Path.class), 
anyBoolean())).thenReturn(false);
+        final DeleteHDFS deleteHDFS = new TestableDeleteHDFS(mockFileSystem);
+        final TestRunner runner = TestRunners.newTestRunner(deleteHDFS);
+        runner.setIncomingConnection(false);
+        runner.assertNotValid();
+        runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, glob.toString());
+        runner.assertValid();
+        runner.run();
+        runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 0);
+        runner.assertTransferCount(DeleteHDFS.REL_FAILURE, fileCount);
+    }
+
+    @Test
+    public void testMixedGlobDelete() throws Exception {
+        final Path glob = new Path("/data/for/2017/08/05/*");
+        final int fileCount = 3;
+        final FileStatus[] fileStatuses = new FileStatus[fileCount];
+        for (int i = 0; i < fileCount; i++) {
+            final Path file = new Path("/data/for/2017/08/05/file" + i);
+            final FileStatus fileStatus = mock(FileStatus.class);
+            when(fileStatus.getPath()).thenReturn(file);
+            fileStatuses[i] = fileStatus;
+        }
+        when(mockFileSystem.exists(any(Path.class))).thenReturn(true);
+        
when(mockFileSystem.globStatus(any(Path.class))).thenReturn(fileStatuses);
+        when(mockFileSystem.getUri()).thenReturn(new 
URI("hdfs://0.example.com:8020"));
+        when(mockFileSystem.delete(any(Path.class), anyBoolean()))
+                .thenReturn(false)
+                .thenReturn(true)
+                .thenReturn(false);
+        final DeleteHDFS deleteHDFS = new TestableDeleteHDFS(mockFileSystem);
+        final TestRunner runner = TestRunners.newTestRunner(deleteHDFS);
+        runner.setIncomingConnection(false);
+        runner.assertNotValid();
+        runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, glob.toString());
+        runner.assertValid();
+        runner.run();
+        runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 0);
+        runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 2);
+    }
+
     @Test
     public void testGlobDeleteFromIncomingFlowFile() throws Exception {
         Path glob = new Path("/data/for/2017/08/05/*");
@@ -207,6 +282,7 @@ public class TestDeleteHDFS {
         when(mockFileSystem.exists(any(Path.class))).thenReturn(true);
         
when(mockFileSystem.globStatus(any(Path.class))).thenReturn(fileStatuses);
         when(mockFileSystem.getUri()).thenReturn(new 
URI("hdfs://0.example.com:8020"));
+        when(mockFileSystem.delete(any(Path.class), 
anyBoolean())).thenReturn(true);
         DeleteHDFS deleteHDFS = new TestableDeleteHDFS(mockFileSystem);
         TestRunner runner = TestRunners.newTestRunner(deleteHDFS);
         runner.setIncomingConnection(true);

Reply via email to