This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 90498a352d NIFI-12249 FetchFTP and FetchSFTP set fetch.failure.reason
on failures
90498a352d is described below
commit 90498a352d059376b96630c1c3f1136313999d17
Author: annanys23 <[email protected]>
AuthorDate: Wed Oct 25 00:24:27 2023 +0000
NIFI-12249 FetchFTP and FetchSFTP set fetch.failure.reason on failures
- Set fetch.failure.reason to relationship name when routing to failure
relationships
This closes #7929
Signed-off-by: David Handermann <[email protected]>
---
.../apache/nifi/processors/standard/FetchFTP.java | 3 +-
.../processors/standard/FetchFileTransfer.java | 43 +++++++++++++---------
.../apache/nifi/processors/standard/FetchSFTP.java | 3 +-
.../apache/nifi/processors/standard/TestFTP.java | 2 +
.../nifi/processors/standard/TestFetchFTP.java | 11 ++++++
5 files changed, 42 insertions(+), 20 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java
index d920cb1085..2323010421 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java
@@ -47,7 +47,8 @@ import org.apache.nifi.processors.standard.util.FileTransfer;
@WritesAttribute(attribute = "ftp.remote.port", description = "The port
that was used to communicate with the remote FTP server"),
@WritesAttribute(attribute = "ftp.remote.filename", description = "The
name of the remote file that was pulled"),
@WritesAttribute(attribute = "filename", description = "The filename is
updated to point to the filename fo the remote file"),
- @WritesAttribute(attribute = "path", description = "If the Remote File
contains a directory name, that directory name will be added to the FlowFile
using the 'path' attribute")
+ @WritesAttribute(attribute = "path", description = "If the Remote File
contains a directory name, that directory name will be added to the FlowFile
using the 'path' attribute"),
+ @WritesAttribute(attribute = "fetch.failure.reason", description = "The
name of the failure relationship applied when routing to any failure
relationship")
})
@MultiProcessorUseCase(
description = "Retrieve all files in a directory of an FTP Server",
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
index 227904282d..1e8efba2c0 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
@@ -61,7 +61,7 @@ public abstract class FetchFileTransfer extends
AbstractProcessor {
static final AllowableValue COMPLETION_NONE = new AllowableValue("None",
"None", "Leave the file as-is");
static final AllowableValue COMPLETION_MOVE = new AllowableValue("Move
File", "Move File", "Move the file to the directory specified by the <Move
Destination Directory> property");
static final AllowableValue COMPLETION_DELETE = new AllowableValue("Delete
File", "Delete File", "Deletes the original file from the remote system");
-
+ static final String FAILURE_REASON_ATTRIBUTE = "fetch.failure.reason";
static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
.name("Hostname")
@@ -254,36 +254,33 @@ public abstract class FetchFileTransfer extends
AbstractProcessor {
transfer = transferWrapper.getFileTransfer();
}
+ Relationship failureRelationship = null;
+ boolean closeConnOnFailure = false;
+
try {
// Pull data from remote system.
try {
flowFile = transfer.getRemoteFile(filename, flowFile, session);
-
} catch (final FileNotFoundException e) {
+ failureRelationship = REL_NOT_FOUND;
getLogger().log(levelFileNotFound, "Failed to fetch content
for {} from filename {} on remote host {} because the file could not be found
on the remote system; routing to {}",
- flowFile, filename, host, REL_NOT_FOUND.getName());
- session.transfer(session.penalize(flowFile), REL_NOT_FOUND);
- session.getProvenanceReporter().route(flowFile, REL_NOT_FOUND);
- cleanupTransfer(transfer, false, transferQueue, host, port);
- return;
+ flowFile, filename, host,
failureRelationship.getName());
} catch (final PermissionDeniedException e) {
+ failureRelationship = REL_PERMISSION_DENIED;
getLogger().error("Failed to fetch content for {} from
filename {} on remote host {} due to insufficient permissions; routing to {}",
- flowFile, filename, host,
REL_PERMISSION_DENIED.getName());
- session.transfer(session.penalize(flowFile),
REL_PERMISSION_DENIED);
- session.getProvenanceReporter().route(flowFile,
REL_PERMISSION_DENIED);
- cleanupTransfer(transfer, false, transferQueue, host, port);
- return;
+ flowFile, filename, host,
failureRelationship.getName());
} catch (final ProcessException | IOException e) {
- getLogger().error("Failed to fetch content for {} from
filename {} on remote host {}:{} due to {}; routing to comms.failure",
- flowFile, filename, host, port, e.toString(), e);
- session.transfer(session.penalize(flowFile),
REL_COMMS_FAILURE);
- cleanupTransfer(transfer, true, transferQueue, host, port);
- return;
+ failureRelationship = REL_COMMS_FAILURE;
+ getLogger().error("Failed to fetch content for {} from
filename {} on remote host {}:{} due to {}; routing to {}",
+ flowFile, filename, host, port, e.toString(),
failureRelationship.getName(), e);
+
+ closeConnOnFailure = true;
}
// Add FlowFile attributes
- final String protocolName = transfer.getProtocolName();
final Map<String, String> attributes = new HashMap<>();
+ final String protocolName = transfer.getProtocolName();
+
attributes.put(protocolName + ".remote.host", host);
attributes.put(protocolName + ".remote.port",
String.valueOf(port));
attributes.put(protocolName + ".remote.filename", filename);
@@ -296,6 +293,16 @@ public abstract class FetchFileTransfer extends
AbstractProcessor {
} else {
attributes.put(CoreAttributes.FILENAME.key(), filename);
}
+
+ if (failureRelationship != null) {
+ attributes.put(FAILURE_REASON_ATTRIBUTE,
failureRelationship.getName());
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ session.transfer(session.penalize(flowFile),
failureRelationship);
+ session.getProvenanceReporter().route(flowFile,
failureRelationship);
+ cleanupTransfer(transfer, closeConnOnFailure, transferQueue,
host, port);
+ return;
+ }
+
flowFile = session.putAllAttributes(flowFile, attributes);
// emit provenance event and transfer FlowFile
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
index 9fde9114e4..e35c45cfd3 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
@@ -47,7 +47,8 @@ import org.apache.nifi.processors.standard.util.SFTPTransfer;
@WritesAttribute(attribute = "sftp.remote.port", description = "The port
that was used to communicate with the remote SFTP server"),
@WritesAttribute(attribute = "sftp.remote.filename", description = "The
name of the remote file that was pulled"),
@WritesAttribute(attribute = "filename", description = "The filename is
updated to point to the filename fo the remote file"),
- @WritesAttribute(attribute = "path", description = "If the Remote File
contains a directory name, that directory name will be added to the FlowFile
using the 'path' attribute")
+ @WritesAttribute(attribute = "path", description = "If the Remote File
contains a directory name, that directory name will be added to the FlowFile
using the 'path' attribute"),
+ @WritesAttribute(attribute = "fetch.failure.reason", description = "The
name of the failure relationship applied when routing to any failure
relationship")
})
@MultiProcessorUseCase(
description = "Retrieve all files in a directory of an SFTP Server",
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java
index 97b6a8e737..6695bb50a7 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java
@@ -269,6 +269,7 @@ public class TestFTP {
runner.run();
runner.assertAllFlowFilesTransferred(FetchFTP.REL_NOT_FOUND);
+
runner.assertAllFlowFilesContainAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE);
}
@Test
@@ -290,6 +291,7 @@ public class TestFTP {
runner.run();
runner.assertAllFlowFilesTransferred(FetchFTP.REL_PERMISSION_DENIED);
+
runner.assertAllFlowFilesContainAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE);
}
@Test
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFTP.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFTP.java
index 5f636880ec..4fcdf7a21a 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFTP.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFTP.java
@@ -113,6 +113,9 @@ public class TestFetchFTP {
runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_NOT_FOUND,
1);
+
runner.assertAllFlowFilesContainAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE);
+ MockFlowFile transferredFlowFile =
runner.getPenalizedFlowFiles().get(0);
+ assertEquals(FetchFileTransfer.REL_NOT_FOUND.getName(),
transferredFlowFile.getAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE));
}
@Test
@@ -122,6 +125,9 @@ public class TestFetchFTP {
runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_PERMISSION_DENIED,
1);
+
runner.assertAllFlowFilesContainAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE);
+ MockFlowFile transferredFlowFile =
runner.getPenalizedFlowFiles().get(0);
+ assertEquals(FetchFileTransfer.REL_PERMISSION_DENIED.getName(),
transferredFlowFile.getAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE));
}
@Test
@@ -132,6 +138,7 @@ public class TestFetchFTP {
runner.run(2, false, false);
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_PERMISSION_DENIED,
2);
+
runner.assertAllFlowFilesContainAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE);
assertEquals(1, proc.numberOfFileTransfers);
assertFalse(proc.isClosed);
@@ -145,6 +152,7 @@ public class TestFetchFTP {
runner.run(2, false, false);
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_NOT_FOUND,
2);
+
runner.assertAllFlowFilesContainAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE);
assertEquals(1, proc.numberOfFileTransfers);
assertFalse(proc.isClosed);
@@ -157,6 +165,9 @@ public class TestFetchFTP {
runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_COMMS_FAILURE, 1);
+
runner.assertAllFlowFilesContainAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE);
+ MockFlowFile transferredFlowFile =
runner.getPenalizedFlowFiles().get(0);
+ assertEquals(FetchFileTransfer.REL_COMMS_FAILURE.getName(),
transferredFlowFile.getAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE));
assertTrue(proc.isClosed);
}