Repository: mina-sshd Updated Branches: refs/heads/master f57ad0da9 -> 19d5e8edd
[SSHD-389] Implement a disconnect timeout on the session Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/19d5e8ed Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/19d5e8ed Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/19d5e8ed Branch: refs/heads/master Commit: 19d5e8edd1813ff186ce9ba606859c40fda1390e Parents: f57ad0d Author: Guillaume Nodet <[email protected]> Authored: Mon Dec 15 21:46:00 2014 +0100 Committer: Guillaume Nodet <[email protected]> Committed: Tue Dec 16 00:24:09 2014 +0100 ---------------------------------------------------------------------- .../org/apache/sshd/common/FactoryManager.java | 8 + .../java/org/apache/sshd/common/Session.java | 17 ++- .../sshd/common/session/AbstractSession.java | 33 +++- .../test/java/org/apache/sshd/ServerTest.java | 150 +++++++++++++++++++ 4 files changed, 204 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/19d5e8ed/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java index c78f8e8..8179835 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java @@ -71,6 +71,14 @@ public interface FactoryManager { public static final String IDLE_TIMEOUT = "idle-timeout"; /** + * Key used to retrieve the value of the disconnect timeout which + * is used when a disconnection is attempted. If the disconnect + * message has not been sent before the timeout, the underlying socket + * will be forcibly closed. + */ + public static final String DISCONNECT_TIMEOUT = "disconnect-timeout"; + + /** * Socket backlog. * See {@link java.nio.channels.AsynchronousServerSocketChannel#bind(java.net.SocketAddress, int)} */ http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/19d5e8ed/sshd-core/src/main/java/org/apache/sshd/common/Session.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/Session.java b/sshd-core/src/main/java/org/apache/sshd/common/Session.java index f01edad..87e9b49 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/Session.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/Session.java @@ -19,6 +19,7 @@ package org.apache.sshd.common; import java.io.IOException; +import java.util.concurrent.TimeUnit; import org.apache.sshd.common.future.SshFuture; import org.apache.sshd.common.io.IoSession; @@ -122,11 +123,25 @@ public interface Session extends Closeable { * * @param buffer the buffer to encode and send * @return a future that can be used to check when the packet has actually been sent - * @throws java.io.IOException if an error occured when encoding sending the packet + * @throws java.io.IOException if an error occurred when encoding sending the packet */ IoWriteFuture writePacket(Buffer buffer) throws IOException; /** + * Encode and send the given buffer with the specified timeout. + * If the buffer could not be written before the timeout elapses, the returned + * {@link org.apache.sshd.common.io.IoWriteFuture} will be set with a + * {@link java.util.concurrent.TimeoutException} exception to indicate a timeout. + * + * @param buffer the buffer to encode and spend + * @param timeout the timeout + * @param unit the time unit of the timeout parameter + * @return a future that can be used to check when the packet has actually been sent + * @throws java.io.IOException if an error occurred when encoding sending the packet + */ + IoWriteFuture writePacket(Buffer buffer, long timeout, TimeUnit unit) throws IOException; + + /** * Send a global request and wait for the response. * This must only be used when sending a SSH_MSG_GLOBAL_REQUEST with a result expected, * else it will wait forever. http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/19d5e8ed/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java index 91c0552..a62b1cb 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java @@ -26,7 +26,9 @@ import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -148,8 +150,9 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea // Session timeout protected long authTimeoutTimestamp = 0L; protected long idleTimeoutTimestamp = 0L; - protected long authTimeoutMs = TimeUnit.MINUTES.toMillis(2); // 2 minutes in milliseconds - protected long idleTimeoutMs = TimeUnit.MINUTES.toMillis(10); // 10 minutes in milliseconds + protected long authTimeoutMs = TimeUnit.MINUTES.toMillis(2); // 2 minutes in milliseconds + protected long idleTimeoutMs = TimeUnit.MINUTES.toMillis(10); // 10 minutes in milliseconds + protected long disconnectTimeoutMs = TimeUnit.SECONDS.toMillis(10); // 10 seconds in milliseconds protected final AtomicReference<TimeoutStatus> timeoutStatus = new AtomicReference<TimeoutStatus>(TimeoutStatus.NoTimeout); // @@ -178,6 +181,7 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea authTimeoutMs = getLongProperty(FactoryManager.AUTH_TIMEOUT, authTimeoutMs); authTimeoutTimestamp = System.currentTimeMillis() + authTimeoutMs; idleTimeoutMs = getLongProperty(FactoryManager.IDLE_TIMEOUT, idleTimeoutMs); + disconnectTimeoutMs = getLongProperty(FactoryManager.DISCONNECT_TIMEOUT, disconnectTimeoutMs); } /** @@ -517,6 +521,27 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea } } + @SuppressWarnings("unchecked") + @Override + public IoWriteFuture writePacket(Buffer buffer, long timeout, TimeUnit unit) throws IOException { + final IoWriteFuture writeFuture = writePacket(buffer); + final DefaultSshFuture<IoWriteFuture> future = (DefaultSshFuture<IoWriteFuture>) writeFuture; + final ScheduledFuture<?> sched = factoryManager.getScheduledExecutorService().schedule(new Runnable() { + @Override + public void run() { + log.info("Timeout writing packet."); + future.setValue(new TimeoutException()); + } + }, timeout, unit); + future.addListener(new SshFutureListener<IoWriteFuture>() { + @Override + public void operationComplete(IoWriteFuture future) { + sched.cancel(false); + } + }); + return writeFuture; + } + protected IoWriteFuture doWritePacket(Buffer buffer) throws IOException { // Synchronize all write requests as needed by the encoding algorithm // and also queue the write request in this synchronized block to ensure @@ -1072,7 +1097,9 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea buffer.putInt(reason); buffer.putString(msg); buffer.putString(""); - writePacket(buffer).addListener(new SshFutureListener<IoWriteFuture>() { + // Write the packet with a timeout to ensure a timely close of the session + // in case the consumer does not read packets anymore. + writePacket(buffer, disconnectTimeoutMs, TimeUnit.MILLISECONDS).addListener(new SshFutureListener<IoWriteFuture>() { public void operationComplete(IoWriteFuture future) { close(true); } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/19d5e8ed/sshd-core/src/test/java/org/apache/sshd/ServerTest.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/test/java/org/apache/sshd/ServerTest.java b/sshd-core/src/test/java/org/apache/sshd/ServerTest.java index a1ce149..3794c8d 100644 --- a/sshd-core/src/test/java/org/apache/sshd/ServerTest.java +++ b/sshd-core/src/test/java/org/apache/sshd/ServerTest.java @@ -19,22 +19,35 @@ package org.apache.sshd; import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; import java.util.Arrays; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.apache.log4j.Logger; import org.apache.sshd.client.SessionFactory; +import org.apache.sshd.client.channel.ChannelExec; import org.apache.sshd.client.channel.ChannelShell; import org.apache.sshd.client.future.AuthFuture; import org.apache.sshd.client.session.ClientSessionImpl; +import org.apache.sshd.common.Channel; import org.apache.sshd.common.NamedFactory; import org.apache.sshd.common.Session; import org.apache.sshd.common.SessionListener; import org.apache.sshd.common.SshConstants; +import org.apache.sshd.common.channel.WindowClosedException; import org.apache.sshd.common.io.IoSession; +import org.apache.sshd.common.session.AbstractConnectionService; import org.apache.sshd.common.session.AbstractSession; import org.apache.sshd.server.Command; +import org.apache.sshd.server.CommandFactory; +import org.apache.sshd.server.Environment; +import org.apache.sshd.server.ExitCallback; import org.apache.sshd.server.command.ScpCommandFactory; import org.apache.sshd.server.sftp.SftpSubsystem; import org.apache.sshd.util.BaseTest; @@ -175,6 +188,63 @@ public class ServerTest extends BaseTest { assertTrue(TestEchoShellFactory.TestEchoShell.latch.await(1, TimeUnit.SECONDS)); } + /** + * The scenario is the following: + * - create a command that sends continuous data to the client + * - the client does not read the data, filling the ssh window and the tcp socket + * - the server session becomes idle, but the ssh disconnect message can't be written + * - the server session is forcibly closed + */ + @Test + public void testServerIdleTimeoutWithForce() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + + sshd.setCommandFactory(new StreamCommand.Factory()); + sshd.getProperties().put(SshServer.IDLE_TIMEOUT, "5000"); + sshd.getProperties().put(SshServer.DISCONNECT_TIMEOUT, "2000"); + sshd.getSessionFactory().addListener(new SessionListener() { + public void sessionCreated(Session session) { + System.out.println("Session created"); + } + + public void sessionEvent(Session session, Event event) { + System.out.println("Session event: " + event); + } + + public void sessionClosed(Session session) { + System.out.println("Session closed"); + latch.countDown(); + } + }); + + client = SshClient.setUpDefaultClient(); + client.start(); + + ClientSession s = client.connect("test", "localhost", port).await().getSession(); + s.addPasswordIdentity("test"); + s.auth().verify(); + ChannelExec shell = s.createExecChannel("normal"); + // Create a pipe that will block reading when the buffer is full + PipedInputStream pis = new PipedInputStream(); + PipedOutputStream pos = new PipedOutputStream(pis); + shell.setOut(pos); + shell.open().await(); + + AbstractSession serverSession = sshd.getActiveSessions().iterator().next(); + Channel channel = serverSession.getService(AbstractConnectionService.class).getChannels().iterator().next(); + while (channel.getRemoteWindow().getSize() > 0) { + Thread.sleep(1); + } + + Logger.getLogger(getClass()).info("Waiting for session idle timeouts"); + + long t0 = System.currentTimeMillis(); + latch.await(1, TimeUnit.MINUTES); + long t1 = System.currentTimeMillis(); + assertTrue(t1 - t0 > 7000); + assertTrue(t1 - t0 < 10000); + } + @Test public void testLanguage() throws Exception { client = SshClient.setUpDefaultClient(); @@ -216,6 +286,86 @@ public class ServerTest extends BaseTest { } } + public static class StreamCommand implements Command, Runnable { + + public static class Factory implements CommandFactory { + @Override + public Command createCommand(String name) { + return new StreamCommand(name); + } + } + + public static CountDownLatch latch; + + private final String name; + private OutputStream out; + + public StreamCommand(String name) { + this.name = name; + } + + @Override + public void setInputStream(InputStream in) { + + } + + @Override + public void setOutputStream(OutputStream out) { + this.out = out; + } + + @Override + public void setErrorStream(OutputStream err) { + + } + + @Override + public void setExitCallback(ExitCallback callback) { + + } + + @Override + public void start(Environment env) throws IOException { + new Thread(this).start(); + } + + @Override + public void destroy() { + synchronized (name) { + if ("block".equals(name)) { + try { + name.wait(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + } + + @Override + public void run() { + try { + Thread.sleep(5000); + while (true) { + for (int i = 0; i < 100; i++) { + out.write("0123456789\n".getBytes()); + } + out.flush(); + } + } catch (WindowClosedException e) { + // ok, do nothing + } catch (Throwable e) { + e.printStackTrace(); + } finally { + if (latch != null) { + latch.countDown(); + } + } + } + } + + + public static void main(String[] args) throws Exception { SshServer sshd = SshServer.setUpDefaultServer(); sshd.getProperties().put(SshServer.IDLE_TIMEOUT, "10000");
