This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new ebdc5eb  NIFI-7443 Corrected SFTP Keep Alive behavior
ebdc5eb is described below

commit ebdc5eb92b5faf71c5300195d63573792135275d
Author: exceptionfactory <[email protected]>
AuthorDate: Fri Jul 16 13:00:01 2021 -0500

    NIFI-7443 Corrected SFTP Keep Alive behavior
    
    - Configured SSHJ Keep Alive Interval of 5 seconds
    - Updated Send Keep Alive On Timeout property description
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #5223.
---
 .../processors/standard/util/SFTPTransfer.java     | 28 +++++++++++------
 .../processors/standard/util/TestSFTPTransfer.java | 36 ++++++++++++++++++++++
 2 files changed, 54 insertions(+), 10 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
index ee12c82..fd77991 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
@@ -90,6 +90,8 @@ import java.util.stream.Collectors;
 import static 
org.apache.nifi.processors.standard.util.FTPTransfer.createComponentProxyConfigSupplier;
 
 public class SFTPTransfer implements FileTransfer {
+    private static final int KEEP_ALIVE_INTERVAL_SECONDS = 5;
+
     private static final Set<String> DEFAULT_KEY_ALGORITHM_NAMES;
     private static final Set<String> DEFAULT_CIPHER_NAMES;
     private static final Set<String> DEFAULT_MESSAGE_AUTHENTICATION_CODE_NAMES;
@@ -162,7 +164,7 @@ public class SFTPTransfer implements FileTransfer {
         .build();
     public static final PropertyDescriptor USE_KEEPALIVE_ON_TIMEOUT = new 
PropertyDescriptor.Builder()
         .name("Send Keep Alive On Timeout")
-        .description("Indicates whether or not to send a single Keep Alive 
message when SSH socket times out")
+        .description("Send a Keep Alive message every 5 seconds up to 5 times 
for an overall timeout of 25 seconds.")
         .allowableValues("true", "false")
         .defaultValue("true")
         .required(true)
@@ -570,6 +572,20 @@ public class SFTPTransfer implements FileTransfer {
         }
     };
 
+    private static final KeepAliveProvider DEFAULT_KEEP_ALIVE_PROVIDER = new 
KeepAliveProvider() {
+        @Override
+        public KeepAlive provide(final ConnectionImpl connection) {
+            final KeepAlive keepAlive = 
KeepAliveProvider.KEEP_ALIVE.provide(connection);
+            keepAlive.setKeepAliveInterval(KEEP_ALIVE_INTERVAL_SECONDS);
+            return keepAlive;
+        }
+    };
+
+    protected KeepAliveProvider getKeepAliveProvider() {
+        final boolean useKeepAliveOnTimeout = 
ctx.getProperty(USE_KEEPALIVE_ON_TIMEOUT).asBoolean();
+        return useKeepAliveOnTimeout ? DEFAULT_KEEP_ALIVE_PROVIDER : 
NO_OP_KEEP_ALIVE;
+    }
+
     protected SFTPClient getSFTPClient(final FlowFile flowFile) throws 
IOException {
         // If the client is already initialized then compare the host that the 
client is connected to with the current
         // host from the properties/flow-file, and if different then we need 
to close and reinitialize, if same we can reuse
@@ -586,16 +602,8 @@ public class SFTPTransfer implements FileTransfer {
         }
 
         // Initialize a new SSHClient...
-
-        // If use keep-alive is set then set the provider which sends max of 5 
keep-alives, otherwise set the no-op provider
         final DefaultConfig sshClientConfig = new DefaultConfig();
-        final boolean useKeepAliveOnTimeout = 
ctx.getProperty(USE_KEEPALIVE_ON_TIMEOUT).asBoolean();
-        if (useKeepAliveOnTimeout) {
-            sshClientConfig.setKeepAliveProvider(KeepAliveProvider.KEEP_ALIVE);
-        } else {
-            sshClientConfig.setKeepAliveProvider(NO_OP_KEEP_ALIVE);
-        }
-
+        sshClientConfig.setKeepAliveProvider(getKeepAliveProvider());
         updateConfigAlgorithms(sshClientConfig);
 
         final SSHClient sshClient = new SSHClient(sshClientConfig);
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSFTPTransfer.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSFTPTransfer.java
index f164487..d8fa728 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSFTPTransfer.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSFTPTransfer.java
@@ -16,12 +16,16 @@
  */
 package org.apache.nifi.processors.standard.util;
 
+import net.schmizz.keepalive.KeepAlive;
+import net.schmizz.keepalive.KeepAliveProvider;
 import net.schmizz.sshj.SSHClient;
 import net.schmizz.sshj.DefaultConfig;
 import net.schmizz.sshj.common.Factory;
+import net.schmizz.sshj.connection.ConnectionImpl;
 import net.schmizz.sshj.sftp.Response;
 import net.schmizz.sshj.sftp.SFTPClient;
 import net.schmizz.sshj.sftp.SFTPException;
+import net.schmizz.sshj.transport.Transport;
 import net.schmizz.sshj.userauth.method.AuthKeyboardInteractive;
 import net.schmizz.sshj.userauth.method.AuthMethod;
 import net.schmizz.sshj.userauth.method.AuthPassword;
@@ -50,6 +54,7 @@ import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -331,4 +336,35 @@ public class TestSFTPTransfer {
         final SSHClient sshClient = new SSHClient();
         assertThrows(ProcessException.class, () -> 
sftpTransfer.getAuthMethods(sshClient, null));
     }
+
+    @Test
+    public void testGetKeepAliveProviderEnabled() {
+        final ProcessContext processContext = mock(ProcessContext.class);
+        
when(processContext.getProperty(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT)).thenReturn(new
 MockPropertyValue(Boolean.TRUE.toString()));
+
+        final KeepAlive keepAlive = getKeepAlive(processContext);
+        assertNotSame("Keep Alive Interval not configured", 0, 
keepAlive.getKeepAliveInterval());
+    }
+
+    @Test
+    public void testGetKeepAliveProviderDisabled() {
+        final ProcessContext processContext = mock(ProcessContext.class);
+        
when(processContext.getProperty(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT)).thenReturn(new
 MockPropertyValue(Boolean.FALSE.toString()));
+
+        final KeepAlive keepAlive = getKeepAlive(processContext);
+        assertEquals("Keep Alive Interval configured", 0, 
keepAlive.getKeepAliveInterval());
+    }
+
+    private KeepAlive getKeepAlive(final ProcessContext processContext) {
+        final SFTPClient sftpClient = mock(SFTPClient.class);
+        final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, 
sftpClient);
+
+        final Transport transport = mock(Transport.class);
+        when(transport.getConfig()).thenReturn(new DefaultConfig());
+        final KeepAliveProvider mockKeepAliveProvider = 
mock(KeepAliveProvider.class);
+        final ConnectionImpl connection = new ConnectionImpl(transport, 
mockKeepAliveProvider);
+
+        final KeepAliveProvider keepAliveProvider = 
sftpTransfer.getKeepAliveProvider();
+        return keepAliveProvider.provide(connection);
+    }
 }

Reply via email to