This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 9e6ee656b5 NIFI-10532 ensuring client gets reset if any of the key
values host/port/user/pw change on a per ff basis (#6445)
9e6ee656b5 is described below
commit 9e6ee656b54a91384ce2d2d31cd2d0489534f61b
Author: Joe Witt <[email protected]>
AuthorDate: Mon Sep 26 14:00:23 2022 -0700
NIFI-10532 ensuring client gets reset if any of the key values
host/port/user/pw change on a per ff basis (#6445)
---
.../nifi/processors/standard/PutFileTransfer.java | 6 ++---
.../nifi/processors/standard/util/FTPTransfer.java | 18 +++++++++++--
.../processors/standard/util/SFTPTransfer.java | 30 +++++++++++++++++++---
3 files changed, 45 insertions(+), 9 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java
index b729679f87..afb6bb6058 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java
@@ -102,10 +102,8 @@ public abstract class PutFileTransfer<T extends
FileTransfer> extends AbstractPr
int fileCount = 0;
try (final T transfer = getFileTransfer(context)) {
do {
- //check if hostname is regular expression requiring evaluation
-
if(context.getProperty(FileTransfer.HOSTNAME).isExpressionLanguagePresent()) {
- hostname =
context.getProperty(FileTransfer.HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
- }
+ //evaluate again inside the loop as each flowfile can have a
different hostname
+ hostname =
context.getProperty(FileTransfer.HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
final String rootPath =
context.getProperty(FileTransfer.REMOTE_PATH).evaluateAttributeExpressions(flowFile).getValue();
final String workingDirPath;
if (StringUtils.isBlank(rootPath)) {
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
index 2d3e7adb2f..f36c1ed612 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
@@ -32,6 +32,7 @@ import java.util.Date;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Objects;
import java.util.function.Supplier;
import java.util.regex.Pattern;
@@ -49,6 +50,7 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processors.standard.ftp.FTPClientProvider;
import org.apache.nifi.processors.standard.ftp.StandardFTPClientProvider;
import org.apache.nifi.proxy.ProxyConfiguration;
@@ -152,6 +154,9 @@ public class FTPTransfer implements FileTransfer {
private FTPClient client;
private String homeDirectory;
private String remoteHostName;
+ private String remotePort;
+ private String remoteUsername;
+ private String remotePassword;
public FTPTransfer(final ProcessContext context, final ComponentLog
logger) {
this.ctx = context;
@@ -546,10 +551,16 @@ public class FTPTransfer implements FileTransfer {
private FTPClient getClient(final FlowFile flowFile) throws IOException {
final String hostname =
ctx.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
+ final String port =
ctx.getProperty(PORT).evaluateAttributeExpressions(flowFile).getValue();
+ final String username =
ctx.getProperty(FileTransfer.USERNAME).evaluateAttributeExpressions(flowFile).getValue();
+ final String password =
ctx.getProperty(FileTransfer.PASSWORD).evaluateAttributeExpressions(flowFile).getValue();
if (client != null) {
- if (remoteHostName.equals(hostname)) {
- // destination matches so we can keep our current session
+ if (Objects.equals(remoteHostName, hostname)
+ && Objects.equals(remotePort, port)
+ && Objects.equals(remoteUsername, username)
+ && Objects.equals(remotePassword, password)) {
+ // The key things match so we can keep our current session
resetWorkingDirectory();
return client;
} else {
@@ -561,6 +572,9 @@ public class FTPTransfer implements FileTransfer {
final Map<String, String> attributes = flowFile == null ?
Collections.emptyMap() : flowFile.getAttributes();
client = createClient(ctx, attributes);
remoteHostName = hostname;
+ remotePort = port;
+ remoteUsername = username;
+ remotePassword = password;
closed = false;
homeDirectory = client.printWorkingDirectory();
return client;
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
index 2e58ee122d..85eb806f61 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
@@ -64,6 +64,7 @@ import java.util.Date;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
@@ -226,6 +227,12 @@ public class SFTPTransfer implements FileTransfer {
private volatile boolean closed = false;
private String homeDir;
+ private String activeHostname;
+ private String activePort;
+ private String activeUsername;
+ private String activePassword;
+ private String activePrivateKeyPath;
+ private String activePrivateKeyPassphrase;
private final boolean disableDirectoryListing;
@@ -580,12 +587,23 @@ public class SFTPTransfer implements FileTransfer {
}
protected SFTPClient getSFTPClient(final FlowFile flowFile) throws
IOException {
+ final String evaledHostname =
ctx.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
+ final String evaledPort =
ctx.getProperty(PORT).evaluateAttributeExpressions(flowFile).getValue();
+ final String evaledUsername =
ctx.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue();
+ final String evaledPassword =
ctx.getProperty(PASSWORD).evaluateAttributeExpressions(flowFile).getValue();
+ final String evaledPrivateKeyPath =
ctx.getProperty(PRIVATE_KEY_PATH).evaluateAttributeExpressions(flowFile).getValue();
+ final String evaledPrivateKeyPassphrase =
ctx.getProperty(PRIVATE_KEY_PASSPHRASE).evaluateAttributeExpressions(flowFile).getValue();
+
// If the client is already initialized then compare the host that the
client is connected to with the current
// host from the properties/flow-file, and if different then we need
to close and reinitialize, if same we can reuse
if (sftpClient != null) {
- final String clientHost = sshClient.getRemoteHostname();
- final String propertiesHost =
ctx.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
- if (clientHost.equals(propertiesHost)) {
+ if (Objects.equals(evaledHostname, activeHostname)
+ && Objects.equals(evaledPort, activePort)
+ && Objects.equals(evaledUsername, activeUsername)
+ && Objects.equals(evaledPassword, activePassword)
+ && Objects.equals(evaledPrivateKeyPath,
activePrivateKeyPath)
+ && Objects.equals(evaledPrivateKeyPassphrase,
activePrivateKeyPassphrase)
+ ) {
// destination matches so we can keep our current session
return sftpClient;
} else {
@@ -597,6 +615,12 @@ public class SFTPTransfer implements FileTransfer {
final Map<String, String> attributes = flowFile == null ?
Collections.emptyMap() : flowFile.getAttributes();
this.sshClient = SSH_CLIENT_PROVIDER.getClient(ctx, attributes);
this.sftpClient = new SFTPClient(new SFTPEngine(sshClient).init());
+ activeHostname = evaledHostname;
+ activePort = evaledPort;
+ activePassword = evaledPassword;
+ activeUsername = evaledUsername;
+ activePrivateKeyPath = evaledPrivateKeyPath;
+ activePrivateKeyPassphrase = evaledPrivateKeyPassphrase;
this.closed = false;
// Configure timeout for sftp operations