This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 75217b3 NIFI-6181 FetchSFTP and FetchFTP File Not Found fix
75217b3 is described below
commit 75217b33d0fc43a64b1b80dd581561fda1ece904
Author: bdesert <[email protected]>
AuthorDate: Thu Apr 4 01:40:17 2019 -0400
NIFI-6181 FetchSFTP and FetchFTP File Not Found fix
Signed-off-by: Pierre Villard <[email protected]>
This closes #3407.
---
.../org/apache/nifi/processors/standard/FetchFTP.java | 1 +
.../nifi/processors/standard/FetchFileTransfer.java | 19 ++++++++++++++++++-
.../apache/nifi/processors/standard/FetchSFTP.java | 1 +
.../nifi/processors/standard/util/FTPTransfer.java | 9 ++++++++-
4 files changed, 28 insertions(+), 2 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java
index 4b4d207..0c7828f 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java
@@ -73,6 +73,7 @@ public class FetchFTP extends FetchFileTransfer {
properties.add(FTPTransfer.HTTP_PROXY_USERNAME);
properties.add(FTPTransfer.HTTP_PROXY_PASSWORD);
properties.add(FTPTransfer.BUFFER_SIZE);
+ properties.add(FILE_NOT_FOUND_LOG_LEVEL);
return properties;
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
index c44a406..b7e8f61 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
@@ -18,12 +18,14 @@
package org.apache.nifi.processors.standard;
import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@@ -120,6 +122,14 @@ public abstract class FetchFileTransfer extends
AbstractProcessor {
.required(false)
.build();
+ static final PropertyDescriptor FILE_NOT_FOUND_LOG_LEVEL = new
PropertyDescriptor.Builder()
+ .displayName("Log level when file not found")
+ .name("fetchfiletransfer-notfound-loglevel")
+ .description("Log level to use in case the file does not exist when
the processor is triggered")
+ .allowableValues(LogLevel.values())
+ .defaultValue(LogLevel.ERROR.toString()) // backward compatibility
support
+ .required(true)
+ .build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@@ -141,6 +151,7 @@ public abstract class FetchFileTransfer extends
AbstractProcessor {
private final Map<Tuple<String, Integer>,
BlockingQueue<FileTransferIdleWrapper>> fileTransferMap = new HashMap<>();
private final long IDLE_CONNECTION_MILLIS =
TimeUnit.SECONDS.toMillis(10L); // amount of time to wait before closing an
idle connection
private volatile long lastClearTime = System.currentTimeMillis();
+ private LogLevel levelFileNotFound = LogLevel.ERROR;
@Override
public Set<Relationship> getRelationships() {
@@ -152,6 +163,12 @@ public abstract class FetchFileTransfer extends
AbstractProcessor {
return relationships;
}
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
+ levelFileNotFound =
LogLevel.valueOf(context.getProperty(FILE_NOT_FOUND_LOG_LEVEL).getValue());
+ }
+
+
/**
* Close connections that are idle or optionally close all connections.
* Connections are considered "idle" if they have not been used in 10
seconds.
@@ -261,7 +278,7 @@ public abstract class FetchFileTransfer extends
AbstractProcessor {
} catch (final FileNotFoundException e) {
closeConnection = false;
- getLogger().error("Failed to fetch content for {} from
filename {} on remote host {} because the file could not be found on the remote
system; routing to {}",
+ getLogger().log(levelFileNotFound, "Failed to fetch content
for {} from filename {} on remote host {} because the file could not be found
on the remote system; routing to {}",
new Object[]{flowFile, filename, host,
REL_NOT_FOUND.getName()});
session.transfer(session.penalize(flowFile), REL_NOT_FOUND);
session.getProvenanceReporter().route(flowFile, REL_NOT_FOUND);
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
index 19cba94..2124b68 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
@@ -86,6 +86,7 @@ public class FetchSFTP extends FetchFileTransfer {
properties.add(FTPTransfer.PROXY_PORT);
properties.add(FTPTransfer.HTTP_PROXY_USERNAME);
properties.add(FTPTransfer.HTTP_PROXY_PASSWORD);
+ properties.add(FILE_NOT_FOUND_LOG_LEVEL);
return properties;
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
index 85fee40..6a2b2db 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
@@ -17,6 +17,7 @@
package org.apache.nifi.processors.standard.util;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
@@ -322,7 +323,13 @@ public class FTPTransfer implements FileTransfer {
final FTPClient client = getClient(flowFile);
InputStream in = client.retrieveFileStream(remoteFileName);
if (in == null) {
- throw new IOException(client.getReplyString());
+ final String response = client.getReplyString();
+ // FTPClient doesn't throw exception if file not found.
+ // Instead, response string will contain: "550 Can't open
<absolute_path>: No such file or directory"
+ if (response != null && response.trim().endsWith("No such file or
directory")){
+ throw new FileNotFoundException(response);
+ }
+ throw new IOException(response);
}
return in;
}