Repository: mina-sshd
Updated Branches:
  refs/heads/master f57ad0da9 -> 19d5e8edd


[SSHD-389] Implement a disconnect timeout on the session

Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/19d5e8ed
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/19d5e8ed
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/19d5e8ed

Branch: refs/heads/master
Commit: 19d5e8edd1813ff186ce9ba606859c40fda1390e
Parents: f57ad0d
Author: Guillaume Nodet <[email protected]>
Authored: Mon Dec 15 21:46:00 2014 +0100
Committer: Guillaume Nodet <[email protected]>
Committed: Tue Dec 16 00:24:09 2014 +0100

----------------------------------------------------------------------
 .../org/apache/sshd/common/FactoryManager.java  |   8 +
 .../java/org/apache/sshd/common/Session.java    |  17 ++-
 .../sshd/common/session/AbstractSession.java    |  33 +++-
 .../test/java/org/apache/sshd/ServerTest.java   | 150 +++++++++++++++++++
 4 files changed, 204 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/19d5e8ed/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java 
b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
index c78f8e8..8179835 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
@@ -71,6 +71,14 @@ public interface FactoryManager {
     public static final String IDLE_TIMEOUT = "idle-timeout";
 
     /**
+     * Key used to retrieve the value of the disconnect timeout which
+     * is used when a disconnection is attempted.  If the disconnect
+     * message has not been sent before the timeout, the underlying socket
+     * will be forcibly closed.
+     */
+    public static final String DISCONNECT_TIMEOUT = "disconnect-timeout";
+
+    /**
      * Socket backlog.
      * See {@link 
java.nio.channels.AsynchronousServerSocketChannel#bind(java.net.SocketAddress, 
int)}
      */

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/19d5e8ed/sshd-core/src/main/java/org/apache/sshd/common/Session.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/Session.java 
b/sshd-core/src/main/java/org/apache/sshd/common/Session.java
index f01edad..87e9b49 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/Session.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/Session.java
@@ -19,6 +19,7 @@
 package org.apache.sshd.common;
 
 import java.io.IOException;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.sshd.common.future.SshFuture;
 import org.apache.sshd.common.io.IoSession;
@@ -122,11 +123,25 @@ public interface Session extends Closeable {
      *
      * @param buffer the buffer to encode and send
      * @return a future that can be used to check when the packet has actually 
been sent
-     * @throws java.io.IOException if an error occured when encoding sending 
the packet
+     * @throws java.io.IOException if an error occurred when encoding sending 
the packet
      */
     IoWriteFuture writePacket(Buffer buffer) throws IOException;
 
     /**
+     * Encode and send the given buffer with the specified timeout.
+     * If the buffer could not be written before the timeout elapses, the 
returned
+     * {@link org.apache.sshd.common.io.IoWriteFuture} will be set with a
+     * {@link java.util.concurrent.TimeoutException} exception to indicate a 
timeout.
+     *
+     * @param buffer the buffer to encode and spend
+     * @param timeout the timeout
+     * @param unit the time unit of the timeout parameter
+     * @return a future that can be used to check when the packet has actually 
been sent
+     * @throws java.io.IOException if an error occurred when encoding sending 
the packet
+     */
+    IoWriteFuture writePacket(Buffer buffer, long timeout, TimeUnit unit) 
throws IOException;
+
+    /**
      * Send a global request and wait for the response.
      * This must only be used when sending a SSH_MSG_GLOBAL_REQUEST with a 
result expected,
      * else it will wait forever.

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/19d5e8ed/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java 
b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
index 91c0552..a62b1cb 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
@@ -26,7 +26,9 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -148,8 +150,9 @@ public abstract class AbstractSession extends 
CloseableUtils.AbstractInnerClosea
     // Session timeout
     protected long authTimeoutTimestamp = 0L;
     protected long idleTimeoutTimestamp = 0L;
-    protected long authTimeoutMs = TimeUnit.MINUTES.toMillis(2);    // 2 
minutes in milliseconds
-    protected long idleTimeoutMs = TimeUnit.MINUTES.toMillis(10);   // 10 
minutes in milliseconds
+    protected long authTimeoutMs = TimeUnit.MINUTES.toMillis(2);          // 2 
minutes in milliseconds
+    protected long idleTimeoutMs = TimeUnit.MINUTES.toMillis(10);         // 
10 minutes in milliseconds
+    protected long disconnectTimeoutMs = TimeUnit.SECONDS.toMillis(10);   // 
10 seconds in milliseconds
     protected final AtomicReference<TimeoutStatus> timeoutStatus = new 
AtomicReference<TimeoutStatus>(TimeoutStatus.NoTimeout);
 
     //
@@ -178,6 +181,7 @@ public abstract class AbstractSession extends 
CloseableUtils.AbstractInnerClosea
         authTimeoutMs = getLongProperty(FactoryManager.AUTH_TIMEOUT, 
authTimeoutMs);
         authTimeoutTimestamp = System.currentTimeMillis() + authTimeoutMs;
         idleTimeoutMs = getLongProperty(FactoryManager.IDLE_TIMEOUT, 
idleTimeoutMs);
+        disconnectTimeoutMs = 
getLongProperty(FactoryManager.DISCONNECT_TIMEOUT, disconnectTimeoutMs);
     }
 
     /**
@@ -517,6 +521,27 @@ public abstract class AbstractSession extends 
CloseableUtils.AbstractInnerClosea
         }
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public IoWriteFuture writePacket(Buffer buffer, long timeout, TimeUnit 
unit) throws IOException {
+        final IoWriteFuture writeFuture = writePacket(buffer);
+        final DefaultSshFuture<IoWriteFuture> future = 
(DefaultSshFuture<IoWriteFuture>) writeFuture;
+        final ScheduledFuture<?> sched = 
factoryManager.getScheduledExecutorService().schedule(new Runnable() {
+            @Override
+            public void run() {
+                log.info("Timeout writing packet.");
+                future.setValue(new TimeoutException());
+            }
+        }, timeout, unit);
+        future.addListener(new SshFutureListener<IoWriteFuture>() {
+            @Override
+            public void operationComplete(IoWriteFuture future) {
+                sched.cancel(false);
+            }
+        });
+        return writeFuture;
+    }
+
     protected IoWriteFuture doWritePacket(Buffer buffer) throws IOException {
         // Synchronize all write requests as needed by the encoding algorithm
         // and also queue the write request in this synchronized block to 
ensure
@@ -1072,7 +1097,9 @@ public abstract class AbstractSession extends 
CloseableUtils.AbstractInnerClosea
         buffer.putInt(reason);
         buffer.putString(msg);
         buffer.putString("");
-        writePacket(buffer).addListener(new SshFutureListener<IoWriteFuture>() 
{
+        // Write the packet with a timeout to ensure a timely close of the 
session
+        // in case the consumer does not read packets anymore.
+        writePacket(buffer, disconnectTimeoutMs, 
TimeUnit.MILLISECONDS).addListener(new SshFutureListener<IoWriteFuture>() {
             public void operationComplete(IoWriteFuture future) {
                 close(true);
             }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/19d5e8ed/sshd-core/src/test/java/org/apache/sshd/ServerTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/ServerTest.java 
b/sshd-core/src/test/java/org/apache/sshd/ServerTest.java
index a1ce149..3794c8d 100644
--- a/sshd-core/src/test/java/org/apache/sshd/ServerTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/ServerTest.java
@@ -19,22 +19,35 @@
 package org.apache.sshd;
 
 import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
 import java.util.Arrays;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.log4j.Logger;
 import org.apache.sshd.client.SessionFactory;
+import org.apache.sshd.client.channel.ChannelExec;
 import org.apache.sshd.client.channel.ChannelShell;
 import org.apache.sshd.client.future.AuthFuture;
 import org.apache.sshd.client.session.ClientSessionImpl;
+import org.apache.sshd.common.Channel;
 import org.apache.sshd.common.NamedFactory;
 import org.apache.sshd.common.Session;
 import org.apache.sshd.common.SessionListener;
 import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.channel.WindowClosedException;
 import org.apache.sshd.common.io.IoSession;
+import org.apache.sshd.common.session.AbstractConnectionService;
 import org.apache.sshd.common.session.AbstractSession;
 import org.apache.sshd.server.Command;
+import org.apache.sshd.server.CommandFactory;
+import org.apache.sshd.server.Environment;
+import org.apache.sshd.server.ExitCallback;
 import org.apache.sshd.server.command.ScpCommandFactory;
 import org.apache.sshd.server.sftp.SftpSubsystem;
 import org.apache.sshd.util.BaseTest;
@@ -175,6 +188,63 @@ public class ServerTest extends BaseTest {
         assertTrue(TestEchoShellFactory.TestEchoShell.latch.await(1, 
TimeUnit.SECONDS));
     }
 
+    /**
+     * The scenario is the following:
+     *  - create a command that sends continuous data to the client
+     *  - the client does not read the data, filling the ssh window and the 
tcp socket
+     *  - the server session becomes idle, but the ssh disconnect message 
can't be written
+     *  - the server session is forcibly closed
+     */
+    @Test
+    public void testServerIdleTimeoutWithForce() throws Exception {
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        sshd.setCommandFactory(new StreamCommand.Factory());
+        sshd.getProperties().put(SshServer.IDLE_TIMEOUT, "5000");
+        sshd.getProperties().put(SshServer.DISCONNECT_TIMEOUT, "2000");
+        sshd.getSessionFactory().addListener(new SessionListener() {
+            public void sessionCreated(Session session) {
+                System.out.println("Session created");
+            }
+
+            public void sessionEvent(Session session, Event event) {
+                System.out.println("Session event: " + event);
+            }
+
+            public void sessionClosed(Session session) {
+                System.out.println("Session closed");
+                latch.countDown();
+            }
+        });
+
+        client = SshClient.setUpDefaultClient();
+        client.start();
+
+        ClientSession s = client.connect("test", "localhost", 
port).await().getSession();
+        s.addPasswordIdentity("test");
+        s.auth().verify();
+        ChannelExec shell = s.createExecChannel("normal");
+        // Create a pipe that will block reading when the buffer is full
+        PipedInputStream pis = new PipedInputStream();
+        PipedOutputStream pos = new PipedOutputStream(pis);
+        shell.setOut(pos);
+        shell.open().await();
+
+        AbstractSession serverSession = 
sshd.getActiveSessions().iterator().next();
+        Channel channel = 
serverSession.getService(AbstractConnectionService.class).getChannels().iterator().next();
+        while (channel.getRemoteWindow().getSize() > 0) {
+            Thread.sleep(1);
+        }
+
+        Logger.getLogger(getClass()).info("Waiting for session idle timeouts");
+
+        long t0 = System.currentTimeMillis();
+        latch.await(1, TimeUnit.MINUTES);
+        long t1 = System.currentTimeMillis();
+        assertTrue(t1 - t0 > 7000);
+        assertTrue(t1 - t0 < 10000);
+    }
+
     @Test
     public void testLanguage() throws Exception {
         client = SshClient.setUpDefaultClient();
@@ -216,6 +286,86 @@ public class ServerTest extends BaseTest {
         }
     }
 
+    public static class StreamCommand implements Command, Runnable {
+
+        public static class Factory implements CommandFactory {
+            @Override
+            public Command createCommand(String name) {
+                return new StreamCommand(name);
+            }
+        }
+
+        public static CountDownLatch latch;
+
+        private final String name;
+        private OutputStream out;
+
+        public StreamCommand(String name) {
+            this.name = name;
+        }
+
+        @Override
+        public void setInputStream(InputStream in) {
+
+        }
+
+        @Override
+        public void setOutputStream(OutputStream out) {
+            this.out = out;
+        }
+
+        @Override
+        public void setErrorStream(OutputStream err) {
+
+        }
+
+        @Override
+        public void setExitCallback(ExitCallback callback) {
+
+        }
+
+        @Override
+        public void start(Environment env) throws IOException {
+            new Thread(this).start();
+        }
+
+        @Override
+        public void destroy() {
+            synchronized (name) {
+                if ("block".equals(name)) {
+                    try {
+                        name.wait();
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void run() {
+            try {
+                Thread.sleep(5000);
+                while (true) {
+                    for (int i = 0; i < 100; i++) {
+                        out.write("0123456789\n".getBytes());
+                    }
+                    out.flush();
+                }
+            } catch (WindowClosedException e) {
+                // ok, do nothing
+            } catch (Throwable e) {
+                e.printStackTrace();
+            } finally {
+                if (latch != null) {
+                    latch.countDown();
+                }
+            }
+        }
+    }
+
+
+
     public static void main(String[] args) throws Exception {
         SshServer sshd = SshServer.setUpDefaultServer();
         sshd.getProperties().put(SshServer.IDLE_TIMEOUT, "10000");

Reply via email to