This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new ebdc5eb NIFI-7443 Corrected SFTP Keep Alive behavior
ebdc5eb is described below
commit ebdc5eb92b5faf71c5300195d63573792135275d
Author: exceptionfactory <[email protected]>
AuthorDate: Fri Jul 16 13:00:01 2021 -0500
NIFI-7443 Corrected SFTP Keep Alive behavior
- Configured SSHJ Keep Alive Interval of 5 seconds
- Updated Send Keep Alive On Timeout property description
Signed-off-by: Pierre Villard <[email protected]>
This closes #5223.
---
.../processors/standard/util/SFTPTransfer.java | 28 +++++++++++------
.../processors/standard/util/TestSFTPTransfer.java | 36 ++++++++++++++++++++++
2 files changed, 54 insertions(+), 10 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
index ee12c82..fd77991 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
@@ -90,6 +90,8 @@ import java.util.stream.Collectors;
import static
org.apache.nifi.processors.standard.util.FTPTransfer.createComponentProxyConfigSupplier;
public class SFTPTransfer implements FileTransfer {
+ private static final int KEEP_ALIVE_INTERVAL_SECONDS = 5;
+
private static final Set<String> DEFAULT_KEY_ALGORITHM_NAMES;
private static final Set<String> DEFAULT_CIPHER_NAMES;
private static final Set<String> DEFAULT_MESSAGE_AUTHENTICATION_CODE_NAMES;
@@ -162,7 +164,7 @@ public class SFTPTransfer implements FileTransfer {
.build();
public static final PropertyDescriptor USE_KEEPALIVE_ON_TIMEOUT = new
PropertyDescriptor.Builder()
.name("Send Keep Alive On Timeout")
- .description("Indicates whether or not to send a single Keep Alive
message when SSH socket times out")
+ .description("Send a Keep Alive message every 5 seconds up to 5 times
for an overall timeout of 25 seconds.")
.allowableValues("true", "false")
.defaultValue("true")
.required(true)
@@ -570,6 +572,20 @@ public class SFTPTransfer implements FileTransfer {
}
};
+ private static final KeepAliveProvider DEFAULT_KEEP_ALIVE_PROVIDER = new
KeepAliveProvider() {
+ @Override
+ public KeepAlive provide(final ConnectionImpl connection) {
+ final KeepAlive keepAlive =
KeepAliveProvider.KEEP_ALIVE.provide(connection);
+ keepAlive.setKeepAliveInterval(KEEP_ALIVE_INTERVAL_SECONDS);
+ return keepAlive;
+ }
+ };
+
+ protected KeepAliveProvider getKeepAliveProvider() {
+ final boolean useKeepAliveOnTimeout =
ctx.getProperty(USE_KEEPALIVE_ON_TIMEOUT).asBoolean();
+ return useKeepAliveOnTimeout ? DEFAULT_KEEP_ALIVE_PROVIDER :
NO_OP_KEEP_ALIVE;
+ }
+
protected SFTPClient getSFTPClient(final FlowFile flowFile) throws
IOException {
// If the client is already initialized then compare the host that the
client is connected to with the current
// host from the properties/flow-file, and if different then we need
to close and reinitialize, if same we can reuse
@@ -586,16 +602,8 @@ public class SFTPTransfer implements FileTransfer {
}
// Initialize a new SSHClient...
-
- // If use keep-alive is set then set the provider which sends max of 5
keep-alives, otherwise set the no-op provider
final DefaultConfig sshClientConfig = new DefaultConfig();
- final boolean useKeepAliveOnTimeout =
ctx.getProperty(USE_KEEPALIVE_ON_TIMEOUT).asBoolean();
- if (useKeepAliveOnTimeout) {
- sshClientConfig.setKeepAliveProvider(KeepAliveProvider.KEEP_ALIVE);
- } else {
- sshClientConfig.setKeepAliveProvider(NO_OP_KEEP_ALIVE);
- }
-
+ sshClientConfig.setKeepAliveProvider(getKeepAliveProvider());
updateConfigAlgorithms(sshClientConfig);
final SSHClient sshClient = new SSHClient(sshClientConfig);
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSFTPTransfer.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSFTPTransfer.java
index f164487..d8fa728 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSFTPTransfer.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSFTPTransfer.java
@@ -16,12 +16,16 @@
*/
package org.apache.nifi.processors.standard.util;
+import net.schmizz.keepalive.KeepAlive;
+import net.schmizz.keepalive.KeepAliveProvider;
import net.schmizz.sshj.SSHClient;
import net.schmizz.sshj.DefaultConfig;
import net.schmizz.sshj.common.Factory;
+import net.schmizz.sshj.connection.ConnectionImpl;
import net.schmizz.sshj.sftp.Response;
import net.schmizz.sshj.sftp.SFTPClient;
import net.schmizz.sshj.sftp.SFTPException;
+import net.schmizz.sshj.transport.Transport;
import net.schmizz.sshj.userauth.method.AuthKeyboardInteractive;
import net.schmizz.sshj.userauth.method.AuthMethod;
import net.schmizz.sshj.userauth.method.AuthPassword;
@@ -50,6 +54,7 @@ import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -331,4 +336,35 @@ public class TestSFTPTransfer {
final SSHClient sshClient = new SSHClient();
assertThrows(ProcessException.class, () ->
sftpTransfer.getAuthMethods(sshClient, null));
}
+
+ @Test
+ public void testGetKeepAliveProviderEnabled() {
+ final ProcessContext processContext = mock(ProcessContext.class);
+
when(processContext.getProperty(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT)).thenReturn(new
MockPropertyValue(Boolean.TRUE.toString()));
+
+ final KeepAlive keepAlive = getKeepAlive(processContext);
+ assertNotSame("Keep Alive Interval not configured", 0,
keepAlive.getKeepAliveInterval());
+ }
+
+ @Test
+ public void testGetKeepAliveProviderDisabled() {
+ final ProcessContext processContext = mock(ProcessContext.class);
+
when(processContext.getProperty(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT)).thenReturn(new
MockPropertyValue(Boolean.FALSE.toString()));
+
+ final KeepAlive keepAlive = getKeepAlive(processContext);
+ assertEquals("Keep Alive Interval configured", 0,
keepAlive.getKeepAliveInterval());
+ }
+
+ private KeepAlive getKeepAlive(final ProcessContext processContext) {
+ final SFTPClient sftpClient = mock(SFTPClient.class);
+ final SFTPTransfer sftpTransfer = createSftpTransfer(processContext,
sftpClient);
+
+ final Transport transport = mock(Transport.class);
+ when(transport.getConfig()).thenReturn(new DefaultConfig());
+ final KeepAliveProvider mockKeepAliveProvider =
mock(KeepAliveProvider.class);
+ final ConnectionImpl connection = new ConnectionImpl(transport,
mockKeepAliveProvider);
+
+ final KeepAliveProvider keepAliveProvider =
sftpTransfer.getKeepAliveProvider();
+ return keepAliveProvider.provide(connection);
+ }
}