This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 6fd0b05319 NIFI-15334 Added support for deleting a directory in
DeleteSFTP (#10684)
6fd0b05319 is described below
commit 6fd0b05319366b2823301b01d090b857b9c71853
Author: dan-s1 <[email protected]>
AuthorDate: Tue Dec 23 16:56:10 2025 -0500
NIFI-15334 Added support for deleting a directory in DeleteSFTP (#10684)
Signed-off-by: David Handermann <[email protected]>
---
.../nifi/processors/standard/DeleteSFTP.java | 87 +++++++++++++++++-----
.../nifi/processors/standard/TestDeleteSFTP.java | 40 +++++++++-
2 files changed, 104 insertions(+), 23 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DeleteSFTP.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DeleteSFTP.java
index dccff9e00b..4e778d69d0 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DeleteSFTP.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DeleteSFTP.java
@@ -22,6 +22,7 @@ import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.UseCase;
+import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
@@ -65,6 +66,33 @@ import java.util.concurrent.TimeUnit;
"""
)
public class DeleteSFTP extends AbstractProcessor {
+ public enum RemovalStrategy implements DescribedValue {
+ FILE("File", "Specify a file to delete"),
+ DIRECTORY("Directory", "Specify an empty directory to delete");
+
+ RemovalStrategy(String displayName, String description) {
+ this.displayName = displayName;
+ this.description = description;
+ }
+
+ private final String displayName;
+ private final String description;
+
+ @Override
+ public String getValue() {
+ return name();
+ }
+
+ @Override
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+ }
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@@ -79,26 +107,37 @@ public class DeleteSFTP extends AbstractProcessor {
.description("All FlowFiles, for which an existing file could not
be deleted, are routed to this relationship")
.build();
- private final static Set<Relationship> relationships = Set.of(REL_SUCCESS,
REL_NOT_FOUND, REL_FAILURE);
+ private final static Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS,
REL_NOT_FOUND, REL_FAILURE);
+
+ public static final PropertyDescriptor REMOVAL_STRATEGY = new
PropertyDescriptor.Builder()
+ .name("Removal Strategy")
+ .description("Specifies whether to delete a file or a directory")
+ .allowableValues(RemovalStrategy.class)
+ .defaultValue(RemovalStrategy.FILE)
+ .required(true)
+ .build();
public static final PropertyDescriptor DIRECTORY_PATH = new
PropertyDescriptor.Builder()
.name("Directory Path")
- .description("The path to the directory the file to delete is
located in.")
+ .description("The path to the the actual directory to delete or
the path to the directory the file to delete is located in.")
.required(true)
.defaultValue("${" + CoreAttributes.PATH.key() + "}")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
+
public static final PropertyDescriptor FILENAME = new
PropertyDescriptor.Builder()
.name("Filename")
.description("The name of the file to delete.")
+ .dependsOn(REMOVAL_STRATEGY, RemovalStrategy.FILE)
.required(true)
.defaultValue("${" + CoreAttributes.FILENAME.key() + "}")
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
- private final static List<PropertyDescriptor> properties = List.of(
+ private final static List<PropertyDescriptor> PROPERTY_DESCRIPTORS =
List.of(
+ REMOVAL_STRATEGY,
DIRECTORY_PATH,
FILENAME,
SFTPTransfer.HOSTNAME,
@@ -124,12 +163,12 @@ public class DeleteSFTP extends AbstractProcessor {
@Override
public Set<Relationship> getRelationships() {
- return relationships;
+ return RELATIONSHIPS;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return properties;
+ return PROPERTY_DESCRIPTORS;
}
@Override
@@ -147,6 +186,7 @@ public class DeleteSFTP extends AbstractProcessor {
}
final ComponentLog logger = getLogger();
+ final RemovalStrategy removalStrategy =
context.getProperty(REMOVAL_STRATEGY).asAllowableValue(RemovalStrategy.class);
String hostname =
context.getProperty(FileTransfer.HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
final int maxNumberOfFiles =
context.getProperty(SFTPTransfer.BATCH_SIZE).asInteger();
@@ -156,36 +196,45 @@ public class DeleteSFTP extends AbstractProcessor {
do {
//evaluate again inside the loop as each flowfile can have a
different hostname
hostname =
context.getProperty(FileTransfer.HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
-
final long startNanos = System.nanoTime();
-
final String directoryPathProperty =
context.getProperty(DIRECTORY_PATH).evaluateAttributeExpressions(flowFile).getValue();
- final String filename =
context.getProperty(FILENAME).evaluateAttributeExpressions(flowFile).getValue();
+ String filename = null;
try {
final Path directoryPath =
Paths.get(directoryPathProperty).normalize();
- final Path filePath =
directoryPath.resolve(filename).normalize();
+ final String transitUri;
- if (!directoryPath.equals(filePath.getParent())) {
- final String errorMessage = "Attempting to delete file
at path '%s' which is not a direct child of the directory '%s'"
- .formatted(filePath, directoryPath);
+ if (removalStrategy == RemovalStrategy.DIRECTORY) {
+ transfer.deleteDirectory(flowFile,
directoryPath.toString());
+ transitUri = "sftp://%s".formatted(directoryPath);
+ } else {
+ filename =
context.getProperty(FILENAME).evaluateAttributeExpressions(flowFile).getValue();
+ final Path filePath =
directoryPath.resolve(filename).normalize();
- handleFailure(session, flowFile, errorMessage, null);
- continue;
- }
+ if (!directoryPath.equals(filePath.getParent())) {
+ final String errorMessage = "Attempting to delete
file at path '%s' which is not a direct child of the directory '%s'"
+ .formatted(filePath, directoryPath);
- transfer.deleteFile(flowFile, directoryPath.toString(),
filename);
+ handleFailure(session, flowFile, errorMessage,
null);
+ continue;
+ }
+ transfer.deleteFile(flowFile,
directoryPath.toString(), filename);
+ transitUri = "sftp://%s".formatted(filePath);
+ }
session.transfer(flowFile, REL_SUCCESS);
- final String transitUri = "sftp://%s".formatted(filePath);
final long transferMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
logger.debug("Successfully deleted file at path {} in {}
millis; routing to success", flowFile, transferMillis);
session.getProvenanceReporter().invokeRemoteProcess(flowFile, transitUri,
"Object deleted");
} catch (FileNotFoundException fileNotFoundException) {
session.transfer(flowFile, REL_NOT_FOUND);
} catch (IOException ioException) {
- final String errorMessage = "Failed to delete file '%s' in
directory '%s'"
- .formatted(filename, directoryPathProperty);
+ final String errorMessage;
+ if (removalStrategy == RemovalStrategy.DIRECTORY) {
+ errorMessage = "Failed to delete directory
'%s'".formatted(directoryPathProperty);
+ } else {
+ errorMessage = "Failed to delete file '%s' in
directory '%s'".formatted(filename, directoryPathProperty);
+ }
handleFailure(session, flowFile, errorMessage,
ioException);
}
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDeleteSFTP.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDeleteSFTP.java
index aa08e0a9f7..1a03d48047 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDeleteSFTP.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDeleteSFTP.java
@@ -96,7 +96,7 @@ class TestDeleteSFTP {
}
@Test
- void sendsFlowFileToNotFoundWhenDirectoryDoesNotExist() {
+ void sendsFlowFileToNotFoundWhenDeletingAFileAndDirectoryDoesNotExist() {
final Path directoryPath = sshServerRootPath.resolve("rel/path");
final String filename = "not-exist.txt";
final Path fileToDelete = directoryPath.resolve(filename);
@@ -110,7 +110,7 @@ class TestDeleteSFTP {
}
@Test
- void sendsFlowFileToFailureWhenTargetIsADirectory() throws IOException {
+ void sendsFlowFileToFailureWhenDeletingAFileAndTargetIsADirectory() throws
IOException {
Path fileToDelete =
Files.createDirectories(sshServerRootPath.resolve("a/directory"));
enqueue(fileToDelete);
assertExists(fileToDelete);
@@ -122,6 +122,38 @@ class TestDeleteSFTP {
runner.assertPenalizeCount(1);
}
+ @Test
+ void sendsFlowFileToFailureWhenDeletingADirectoryAndDirectoryIsNotEmpty()
throws IOException {
+ final String directoryToDelete = "someDirectory";
+ final Path someDirectoryPath =
Files.createDirectories(sshServerRootPath.resolve(directoryToDelete));
+ final Path someFilePath = someDirectoryPath.resolve("someFile");
+ Files.writeString(someFilePath, "some text");
+ assertExists(someDirectoryPath);
+
+ enqueue(someFilePath);
+ runner.setProperty(DeleteSFTP.REMOVAL_STRATEGY,
DeleteSFTP.RemovalStrategy.DIRECTORY.getValue());
+ runner.run();
+
+ assertExists(someDirectoryPath);
+ runner.assertAllFlowFilesTransferred(DeleteSFTP.REL_FAILURE);
+ runner.assertPenalizeCount(1);
+ }
+
+ @Test
+ void sendsFlowFileToSuccessWhenDeletingADirectoryAndDirectoryIsEmpty()
throws IOException {
+ final String directoryToDelete = "someDirectory";
+ final Path someDirectoryPath =
Files.createDirectories(sshServerRootPath.resolve(directoryToDelete));
+ final Path someFilePath = someDirectoryPath.resolve("someFile");
+ assertExists(someDirectoryPath);
+
+ enqueue(someFilePath);
+ runner.setProperty(DeleteSFTP.REMOVAL_STRATEGY,
DeleteSFTP.RemovalStrategy.DIRECTORY.getValue());
+ runner.run();
+
+ assertNotExists(someDirectoryPath);
+ runner.assertAllFlowFilesTransferred(DeleteSFTP.REL_SUCCESS);
+ }
+
@Test
void sendsFlowFileToFailureWhenFileIsNotADirectChildOfTheDirectory()
throws IOException {
final Path directoryPath =
Files.createDirectories(sshServerRootPath.resolve("rel/path"));
@@ -176,10 +208,10 @@ class TestDeleteSFTP {
}
private static void assertNotExists(Path filePath) {
- assertTrue(Files.notExists(filePath), () -> "File " + filePath +
"still exists");
+ assertTrue(Files.notExists(filePath), () -> "File %s still
exists".formatted(filePath));
}
private static void assertExists(Path filePath) {
- assertTrue(Files.exists(filePath), () -> "File " + filePath + "does
not exist");
+ assertTrue(Files.exists(filePath), () -> "File %s does not
exist".formatted(filePath));
}
}
\ No newline at end of file