SSHD-253 Improve performance on ServerSession auth and idle timeout checks.
Refactoring of the way ServerSession checks for auth and idle timeouts. The original version was creating, scheduling and unscheduling many Runnable instances while the ServerSession is active. This refactored version creates an 'idle ping' Runnable that checks each of the currently running ServerSessions for timeouts. This check is run once every second. Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/85d11f90 Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/85d11f90 Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/85d11f90 Branch: refs/heads/master Commit: 85d11f90da5b05e6274cad620d86c469f3c388a2 Parents: 20888f7 Author: Michael Heemskerk <[email protected]> Authored: Thu Aug 29 16:54:10 2013 +0200 Committer: Guillaume Nodet <[email protected]> Committed: Tue Dec 3 21:31:47 2013 +0100 ---------------------------------------------------------------------- .../main/java/org/apache/sshd/SshServer.java | 34 +++++++ .../sshd/common/session/AbstractSession.java | 2 +- .../sshd/server/session/ServerSession.java | 97 ++++++-------------- .../session/ServerSessionTimeoutListener.java | 64 +++++++++++++ .../java/org/apache/sshd/CompressionTest.java | 27 +++--- .../java/org/apache/sshd/SshServerTest.java | 47 ++++++++++ 6 files changed, 190 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/85d11f90/sshd-core/src/main/java/org/apache/sshd/SshServer.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/SshServer.java b/sshd-core/src/main/java/org/apache/sshd/SshServer.java index 57d9faf..d75e201 100644 --- a/sshd-core/src/main/java/org/apache/sshd/SshServer.java +++ b/sshd-core/src/main/java/org/apache/sshd/SshServer.java @@ -32,6 +32,8 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import org.apache.sshd.common.AbstractFactoryManager; import org.apache.sshd.common.Channel; @@ -98,6 +100,7 @@ import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider; import org.apache.sshd.server.session.ServerSession; import org.apache.sshd.server.session.SessionFactory; import org.apache.sshd.server.sftp.SftpSubsystem; +import org.apache.sshd.server.session.ServerSessionTimeoutListener; import org.apache.sshd.server.shell.ProcessShellFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -140,6 +143,8 @@ public class SshServer extends AbstractFactoryManager implements ServerFactoryMa protected PasswordAuthenticator passwordAuthenticator; protected PublickeyAuthenticator publickeyAuthenticator; protected GSSAuthenticator gssAuthenticator; + protected ServerSessionTimeoutListener sessionTimeoutListener; + protected ScheduledFuture<?> timeoutListenerFuture; public SshServer() { } @@ -300,6 +305,13 @@ public class SshServer extends AbstractFactoryManager implements ServerFactoryMa sessionFactory.setServer(this); acceptor = createAcceptor(); + // set up the the session timeout listener and schedule it + sessionTimeoutListener = createSessionTimeoutListener(); + sessionFactory.addListener(sessionTimeoutListener); + + timeoutListenerFuture = getScheduledExecutorService() + .scheduleAtFixedRate(sessionTimeoutListener, 1, 1, TimeUnit.SECONDS); + if (host != null) { String[] hosts = host.split(","); LinkedList<InetSocketAddress> addresses = new LinkedList<InetSocketAddress>(); @@ -344,6 +356,9 @@ public class SshServer extends AbstractFactoryManager implements ServerFactoryMa for (AbstractSession session : sessions) { session.close(immediately).addListener(listener); } + + stopSessionTimeoutListener(); + if (!immediately) { latch.await(); } @@ -379,6 +394,25 @@ public class SshServer extends AbstractFactoryManager implements ServerFactoryMa return new SessionFactory(); } + protected ServerSessionTimeoutListener createSessionTimeoutListener() { + return new ServerSessionTimeoutListener(); + } + + protected void stopSessionTimeoutListener() { + // cancel the timeout monitoring task + if (timeoutListenerFuture != null) { + timeoutListenerFuture.cancel(true); + timeoutListenerFuture = null; + } + + // remove the sessionTimeoutListener completely; should the SSH server be restarted, a new one + // will be created. + if (sessionFactory != null && sessionTimeoutListener != null) { + sessionFactory.removeListener(sessionTimeoutListener); + } + sessionTimeoutListener = null; + } + public static SshServer setUpDefaultServer() { SshServer sshd = new SshServer(); // DHG14 uses 2048 bits key which are not supported by the default JCE provider http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/85d11f90/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java index f6784d3..7b78ea8 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java @@ -142,7 +142,7 @@ public abstract class AbstractSession implements Session { protected final Map<AttributeKey<?>, Object> attributes = new ConcurrentHashMap<AttributeKey<?>, Object>(); protected String username; - private State state = State.ReceiveKexInit; + private volatile State state = State.ReceiveKexInit; /** * Create a new session. http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/85d11f90/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSession.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSession.java b/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSession.java index 6791be5..f3d8065 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSession.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSession.java @@ -24,9 +24,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import org.apache.sshd.SshServer; import org.apache.sshd.agent.common.AgentForwardSupport; @@ -39,8 +37,8 @@ import org.apache.sshd.common.SshException; import org.apache.sshd.common.SshdSocketAddress; import org.apache.sshd.common.future.CloseFuture; import org.apache.sshd.common.future.SshFutureListener; -import org.apache.sshd.common.io.IoWriteFuture; import org.apache.sshd.common.io.IoSession; +import org.apache.sshd.common.io.IoWriteFuture; import org.apache.sshd.common.session.AbstractSession; import org.apache.sshd.common.util.Buffer; import org.apache.sshd.server.ServerFactoryManager; @@ -62,12 +60,12 @@ import org.apache.sshd.server.x11.X11ForwardSupport; */ public class ServerSession extends AbstractSession { - private Future authTimerFuture; - private Future idleTimerFuture; + private long authTimeoutTimestamp = 0L; + private long idleTimeoutTimestamp = 0L; private int maxAuthRequests = 20; private int nbAuthRequests; - private int authTimeout = 10 * 60 * 1000; // 10 minutes in milliseconds - private int idleTimeout = 10 * 60 * 1000; // 10 minutes in milliseconds + private int authTimeoutMs = 10 * 60 * 1000; // 10 minutes in milliseconds + private int idleTimeoutMs = 10 * 60 * 1000; // 10 minutes in milliseconds private boolean allowMoreSessions = true; private final AgentForwardSupport agentForward; private final X11ForwardSupport x11Forward; @@ -82,8 +80,8 @@ public class ServerSession extends AbstractSession { public ServerSession(ServerFactoryManager server, IoSession ioSession) throws Exception { super(server, ioSession); maxAuthRequests = getIntProperty(ServerFactoryManager.MAX_AUTH_REQUESTS, maxAuthRequests); - authTimeout = getIntProperty(ServerFactoryManager.AUTH_TIMEOUT, authTimeout); - idleTimeout = getIntProperty(ServerFactoryManager.IDLE_TIMEOUT, idleTimeout); + authTimeoutMs = getIntProperty(ServerFactoryManager.AUTH_TIMEOUT, authTimeoutMs); + idleTimeoutMs = getIntProperty(ServerFactoryManager.IDLE_TIMEOUT, idleTimeoutMs); agentForward = new AgentForwardSupport(this); x11Forward = new X11ForwardSupport(this); log.info("Session created from {}", ioSession.getRemoteAddress()); @@ -93,8 +91,6 @@ public class ServerSession extends AbstractSession { @Override public CloseFuture close(boolean immediately) { - unscheduleAuthTimer(); - unscheduleIdleTimer(); agentForward.close(); x11Forward.close(); return super.close(immediately); @@ -124,11 +120,11 @@ public class ServerSession extends AbstractSession { public IoWriteFuture writePacket(Buffer buffer) throws IOException { boolean rescheduleIdleTimer = getState() == State.Running; if (rescheduleIdleTimer) { - unscheduleIdleTimer(); + resetIdleTimeout(); } IoWriteFuture future = super.writePacket(buffer); if (rescheduleIdleTimer) { - scheduleIdleTimer(); + resetIdleTimeout(); } return future; } @@ -187,7 +183,7 @@ public class ServerSession extends AbstractSession { log.debug("Received SSH_MSG_NEWKEYS"); receiveNewKeys(true); setState(State.WaitForAuth); - scheduleAuthTimer(); + resetAuthTimeout(); break; case WaitForAuth: if (cmd != SshConstants.Message.SSH_MSG_SERVICE_REQUEST) { @@ -212,9 +208,8 @@ public class ServerSession extends AbstractSession { userAuth(buffer, cmd); break; case Running: - unscheduleIdleTimer(); running(cmd, buffer); - scheduleIdleTimer(); + resetIdleTimeout(); break; default: throw new IllegalStateException("Unsupported state: " + getState()); @@ -278,64 +273,31 @@ public class ServerSession extends AbstractSession { } } - private void scheduleAuthTimer() { - Runnable authTimerTask = new Runnable() { - public void run() { - try { - processAuthTimer(); - } catch (IOException e) { - // Ignore - } + /** + * Checks whether the server session has timed out (both auth and idle timeouts are checked). If the session has + * timed out, a DISCONNECT message will be sent to the client. + * + * @throws IOException + */ + protected void checkForTimeouts() throws IOException { + if (getState() != State.Closed) { + long now = System.currentTimeMillis(); + if (!authed && authTimeoutTimestamp > 0 && now > authTimeoutTimestamp) { + disconnect(SshConstants.SSH2_DISCONNECT_PROTOCOL_ERROR, "User authentication has timed out after " + authTimeoutMs + " ms."); } - }; - authTimerFuture = getScheduledExecutorService().schedule(authTimerTask, authTimeout, TimeUnit.MILLISECONDS); - } - - private void unscheduleAuthTimer() { - if (authTimerFuture != null) { - authTimerFuture.cancel(false); - authTimerFuture = null; - } - } - - private void scheduleIdleTimer() { - if (idleTimeout < 1) { - // A timeout less than one means there is no timeout. - return; - } - synchronized (this) { - unscheduleIdleTimer(); - Runnable idleTimerTask = new Runnable() { - public void run() { - try { - processIdleTimer(); - } catch (IOException e) { - // Ignore - } - } - }; - idleTimerFuture = getScheduledExecutorService().schedule(idleTimerTask, idleTimeout, TimeUnit.MILLISECONDS); - } - } - private void unscheduleIdleTimer() { - synchronized (this) { - if (idleTimerFuture != null) { - idleTimerFuture.cancel(false); - idleTimerFuture = null; + if (idleTimeoutTimestamp > 0 && now > idleTimeoutTimestamp) { + disconnect(SshConstants.SSH2_DISCONNECT_PROTOCOL_ERROR, "User session has timed out after " + idleTimeoutMs + " ms."); } } } - private void processAuthTimer() throws IOException { - if (!authed) { - disconnect(SshConstants.SSH2_DISCONNECT_PROTOCOL_ERROR, - "User authentication has timed out"); - } + private void resetAuthTimeout() { + this.authTimeoutTimestamp = System.currentTimeMillis() + authTimeoutMs; } - private void processIdleTimer() throws IOException { - disconnect(SshConstants.SSH2_DISCONNECT_PROTOCOL_ERROR, "User session has timed out after being idled for " + idleTimeout + "ms."); + private void resetIdleTimeout() { + this.idleTimeoutTimestamp = System.currentTimeMillis() + idleTimeoutMs; } private void sendServerIdentification() { @@ -498,9 +460,8 @@ public class ServerSession extends AbstractSession { buffer = createBuffer(SshConstants.Message.SSH_MSG_USERAUTH_SUCCESS, 0); writePacket(buffer); this.authed = true; - unscheduleAuthTimer(); setState(State.Running); - scheduleIdleTimer(); + resetIdleTimeout(); log.info("Session {}@{} authenticated", getUsername(), getIoSession().getRemoteAddress()); } else { http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/85d11f90/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSessionTimeoutListener.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSessionTimeoutListener.java b/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSessionTimeoutListener.java new file mode 100644 index 0000000..580249c --- /dev/null +++ b/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSessionTimeoutListener.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sshd.server.session; + +import org.apache.mina.util.ConcurrentHashSet; +import org.apache.sshd.common.Session; +import org.apache.sshd.common.SessionListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Set; + +/** + * Task that iterates over all currently open {@link ServerSession}s and checks each of them for timeouts. If + * the {@link ServerSession} has timed out (either auth or idle timeout), the session will be disconnected. + * + * @see org.apache.sshd.server.session.ServerSession#checkForTimeouts() + */ +public class ServerSessionTimeoutListener implements SessionListener, Runnable { + + private final Logger log = LoggerFactory.getLogger(ServerSessionTimeoutListener.class); + + private final Set<ServerSession> sessions = new ConcurrentHashSet<ServerSession>(); + + public void sessionCreated(Session session) { + if (session instanceof ServerSession) { + sessions.add((ServerSession) session); + } + } + + public void sessionChanged(Session session) { + // ignore + } + + public void sessionClosed(Session s) { + sessions.remove(s); + } + + public void run() { + for (ServerSession session : sessions) { + try { + session.checkForTimeouts(); + } catch (Exception e) { + log.warn("An error occurred while checking session timeouts", e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/85d11f90/sshd-core/src/test/java/org/apache/sshd/CompressionTest.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/test/java/org/apache/sshd/CompressionTest.java b/sshd-core/src/test/java/org/apache/sshd/CompressionTest.java index 73cba2b..9fd407f 100644 --- a/sshd-core/src/test/java/org/apache/sshd/CompressionTest.java +++ b/sshd-core/src/test/java/org/apache/sshd/CompressionTest.java @@ -121,18 +121,21 @@ public class CompressionTest { }); s.connect(); com.jcraft.jsch.Channel c = s.openChannel("shell"); - c.connect(); - OutputStream os = c.getOutputStream(); - InputStream is = c.getInputStream(); - for (int i = 0; i < 10; i++) { - os.write("this is my command\n".getBytes()); - os.flush(); - byte[] data = new byte[512]; - int len = is.read(data); - String str = new String(data, 0, len); - assertEquals("this is my command\n", str); + try { + c.connect(); + OutputStream os = c.getOutputStream(); + InputStream is = c.getInputStream(); + for (int i = 0; i < 10; i++) { + os.write("this is my command\n".getBytes()); + os.flush(); + byte[] data = new byte[512]; + int len = is.read(data); + String str = new String(data, 0, len); + assertEquals("this is my command\n", str); + } + } finally { + c.disconnect(); + s.disconnect(); } - c.disconnect(); - s.disconnect(); } } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/85d11f90/sshd-core/src/test/java/org/apache/sshd/SshServerTest.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/test/java/org/apache/sshd/SshServerTest.java b/sshd-core/src/test/java/org/apache/sshd/SshServerTest.java index 2b3abbe..e6461af 100644 --- a/sshd-core/src/test/java/org/apache/sshd/SshServerTest.java +++ b/sshd-core/src/test/java/org/apache/sshd/SshServerTest.java @@ -18,12 +18,23 @@ */ package org.apache.sshd; +import org.apache.sshd.util.BogusPasswordAuthenticator; +import org.apache.sshd.util.EchoShellFactory; +import org.apache.sshd.util.Utils; import org.junit.Test; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + /** * @author Kohsuke Kawaguchi + * @author Michael Heemskerk */ public class SshServerTest { + @Test public void stopMethodShouldBeIdempotent() throws Exception { SshServer sshd = new SshServer(); @@ -31,4 +42,40 @@ public class SshServerTest { sshd.stop(); sshd.stop(); } + + @Test + public void testExecutorShutdownFalse() throws Exception { + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + + SshServer sshd = createTestServer(); + sshd.setScheduledExecutorService(executorService); + + sshd.start(); + sshd.stop(); + + assertFalse(executorService.isShutdown()); + } + + @Test + public void testExecutorShutdownTrue() throws Exception { + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + + SshServer sshd = createTestServer(); + sshd.setScheduledExecutorService(executorService, true); + + sshd.start(); + sshd.stop(); + + assertTrue(executorService.isShutdown()); + } + + + private SshServer createTestServer() { + SshServer sshd = SshServer.setUpDefaultServer(); + sshd.setKeyPairProvider(Utils.createTestHostKeyProvider()); + sshd.setShellFactory(new EchoShellFactory()); + sshd.setPasswordAuthenticator(new BogusPasswordAuthenticator()); + + return sshd; + } }
