This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 46605253a5 NIFI-14158: Fixed DeleteHDFS processor ignores delete
action return value
46605253a5 is described below
commit 46605253a52a651c6d86a889bea345a25fd247a9
Author: Mark Bathori <[email protected]>
AuthorDate: Wed Jan 15 14:08:16 2025 +0100
NIFI-14158: Fixed DeleteHDFS processor ignores delete action return value
This closes #9634.
Signed-off-by: Peter Turcsanyi <[email protected]>
---
.../apache/nifi/processors/hadoop/DeleteHDFS.java | 18 +++--
.../nifi/processors/hadoop/TestDeleteHDFS.java | 78 +++++++++++++++++++++-
2 files changed, 90 insertions(+), 6 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
b/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
index 5544f2de96..078040b6aa 100644
---
a/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
+++
b/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
@@ -172,11 +172,19 @@ public class DeleteHDFS extends AbstractHadoopProcessor {
attributes.put(getAttributePrefix() + ".path",
path.getParent().toString());
flowFile = session.putAllAttributes(flowFile,
attributes);
- fileSystem.delete(path, isRecursive(context,
session));
- getLogger().debug("For flowfile {} Deleted file at
path {} with name {}", originalFlowFile, path.getParent(), path.getName());
- final Path qualifiedPath =
path.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
- flowFile = session.putAttribute(flowFile,
HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
-
session.getProvenanceReporter().invokeRemoteProcess(flowFile,
qualifiedPath.toString());
+ boolean success = fileSystem.delete(path,
isRecursive(context, session));
+
+ if (success) {
+ getLogger().debug("For flowfile {} Deleted
file at path {} with name {}", originalFlowFile, path.getParent(),
path.getName());
+ final Path qualifiedPath =
path.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
+ flowFile = session.putAttribute(flowFile,
HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
+
session.getProvenanceReporter().invokeRemoteProcess(flowFile,
qualifiedPath.toString());
+ } else {
+ getLogger().warn("Failed to delete file at
path {} with name {} due to unknown issue, please check related component
logs.", path.getParent(), path.getName());
+ attributes.put(getAttributePrefix() +
".error.message", "Delete action failed due to unknown issue, please check
related component logs.");
+
session.transfer(session.putAllAttributes(session.clone(flowFile), attributes),
getFailureRelationship());
+ failedPath++;
+ }
} catch (IOException ioe) {
if (handleAuthErrors(ioe, session, context, new
GSSExceptionRollbackYieldSessionHandler())) {
return null;
diff --git
a/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java
b/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java
index 21391b2287..1ae82fd1f0 100644
---
a/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java
+++
b/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java
@@ -38,6 +38,7 @@ import static
org.apache.nifi.processors.hadoop.AbstractHadoopProcessor.HADOOP_F
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -56,6 +57,7 @@ public class TestDeleteHDFS {
Path filePath = new Path("/some/path/to/file.txt");
when(mockFileSystem.exists(any(Path.class))).thenReturn(true);
when(mockFileSystem.getUri()).thenReturn(new
URI("hdfs://0.example.com:8020"));
+ when(mockFileSystem.delete(any(Path.class),
anyBoolean())).thenReturn(true);
DeleteHDFS deleteHDFS = new TestableDeleteHDFS(mockFileSystem);
TestRunner runner = TestRunners.newTestRunner(deleteHDFS);
runner.setIncomingConnection(false);
@@ -82,6 +84,7 @@ public class TestDeleteHDFS {
Path filePath = new Path("/some/path/to/file.txt");
when(mockFileSystem.exists(any(Path.class))).thenReturn(true);
when(mockFileSystem.getUri()).thenReturn(new
URI("hdfs://0.example.com:8020"));
+ when(mockFileSystem.delete(any(Path.class),
anyBoolean())).thenReturn(true);
DeleteHDFS deleteHDFS = new TestableDeleteHDFS(mockFileSystem);
TestRunner runner = TestRunners.newTestRunner(deleteHDFS);
runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, "${hdfs.file}");
@@ -156,7 +159,7 @@ public class TestDeleteHDFS {
}
@Test
- public void testUnsuccessfulDelete() throws Exception {
+ public void testDeleteNotExistingFile() throws Exception {
Path filePath = new Path("/some/path/to/file.txt");
when(mockFileSystem.exists(any(Path.class))).thenReturn(false);
DeleteHDFS deleteHDFS = new TestableDeleteHDFS(mockFileSystem);
@@ -169,6 +172,22 @@ public class TestDeleteHDFS {
runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 0);
}
+ @Test
+ public void testFailedDelete() throws Exception {
+ final Path filePath = new Path("/some/path/to/file.txt");
+ when(mockFileSystem.exists(any(Path.class))).thenReturn(true);
+ when(mockFileSystem.delete(any(Path.class),
anyBoolean())).thenReturn(false);
+ final DeleteHDFS deleteHDFS = new TestableDeleteHDFS(mockFileSystem);
+ final TestRunner runner = TestRunners.newTestRunner(deleteHDFS);
+ runner.setIncomingConnection(false);
+ runner.assertNotValid();
+ runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, filePath.toString());
+ runner.assertValid();
+ runner.run();
+ runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 0);
+ runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 1);
+ }
+
@Test
public void testGlobDelete() throws Exception {
Path glob = new Path("/data/for/2017/08/05/*");
@@ -183,6 +202,7 @@ public class TestDeleteHDFS {
when(mockFileSystem.exists(any(Path.class))).thenReturn(true);
when(mockFileSystem.globStatus(any(Path.class))).thenReturn(fileStatuses);
when(mockFileSystem.getUri()).thenReturn(new
URI("hdfs://0.example.com:8020"));
+ when(mockFileSystem.delete(any(Path.class),
anyBoolean())).thenReturn(true);
DeleteHDFS deleteHDFS = new TestableDeleteHDFS(mockFileSystem);
TestRunner runner = TestRunners.newTestRunner(deleteHDFS);
runner.setIncomingConnection(false);
@@ -193,6 +213,61 @@ public class TestDeleteHDFS {
runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 1);
}
+ @Test
+ public void testFailedGlobDelete() throws Exception {
+ final Path glob = new Path("/data/for/2017/08/05/*");
+ final int fileCount = 10;
+ final FileStatus[] fileStatuses = new FileStatus[fileCount];
+ for (int i = 0; i < fileCount; i++) {
+ final Path file = new Path("/data/for/2017/08/05/file" + i);
+ final FileStatus fileStatus = mock(FileStatus.class);
+ when(fileStatus.getPath()).thenReturn(file);
+ fileStatuses[i] = fileStatus;
+ }
+ when(mockFileSystem.exists(any(Path.class))).thenReturn(true);
+
when(mockFileSystem.globStatus(any(Path.class))).thenReturn(fileStatuses);
+ when(mockFileSystem.getUri()).thenReturn(new
URI("hdfs://0.example.com:8020"));
+ when(mockFileSystem.delete(any(Path.class),
anyBoolean())).thenReturn(false);
+ final DeleteHDFS deleteHDFS = new TestableDeleteHDFS(mockFileSystem);
+ final TestRunner runner = TestRunners.newTestRunner(deleteHDFS);
+ runner.setIncomingConnection(false);
+ runner.assertNotValid();
+ runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, glob.toString());
+ runner.assertValid();
+ runner.run();
+ runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 0);
+ runner.assertTransferCount(DeleteHDFS.REL_FAILURE, fileCount);
+ }
+
+ @Test
+ public void testMixedGlobDelete() throws Exception {
+ final Path glob = new Path("/data/for/2017/08/05/*");
+ final int fileCount = 3;
+ final FileStatus[] fileStatuses = new FileStatus[fileCount];
+ for (int i = 0; i < fileCount; i++) {
+ final Path file = new Path("/data/for/2017/08/05/file" + i);
+ final FileStatus fileStatus = mock(FileStatus.class);
+ when(fileStatus.getPath()).thenReturn(file);
+ fileStatuses[i] = fileStatus;
+ }
+ when(mockFileSystem.exists(any(Path.class))).thenReturn(true);
+
when(mockFileSystem.globStatus(any(Path.class))).thenReturn(fileStatuses);
+ when(mockFileSystem.getUri()).thenReturn(new
URI("hdfs://0.example.com:8020"));
+ when(mockFileSystem.delete(any(Path.class), anyBoolean()))
+ .thenReturn(false)
+ .thenReturn(true)
+ .thenReturn(false);
+ final DeleteHDFS deleteHDFS = new TestableDeleteHDFS(mockFileSystem);
+ final TestRunner runner = TestRunners.newTestRunner(deleteHDFS);
+ runner.setIncomingConnection(false);
+ runner.assertNotValid();
+ runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, glob.toString());
+ runner.assertValid();
+ runner.run();
+ runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 0);
+ runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 2);
+ }
+
@Test
public void testGlobDeleteFromIncomingFlowFile() throws Exception {
Path glob = new Path("/data/for/2017/08/05/*");
@@ -207,6 +282,7 @@ public class TestDeleteHDFS {
when(mockFileSystem.exists(any(Path.class))).thenReturn(true);
when(mockFileSystem.globStatus(any(Path.class))).thenReturn(fileStatuses);
when(mockFileSystem.getUri()).thenReturn(new
URI("hdfs://0.example.com:8020"));
+ when(mockFileSystem.delete(any(Path.class),
anyBoolean())).thenReturn(true);
DeleteHDFS deleteHDFS = new TestableDeleteHDFS(mockFileSystem);
TestRunner runner = TestRunners.newTestRunner(deleteHDFS);
runner.setIncomingConnection(true);