Repository: mina-sshd Updated Branches: refs/heads/master 7f88d3a18 -> 2893dcc42
[SSHD-394] Use an ExecutorService to spawn SftpSubsystem command Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/2893dcc4 Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/2893dcc4 Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/2893dcc4 Branch: refs/heads/master Commit: 2893dcc42a38bfde654ecc8028005b41afcf887a Parents: 7f88d3a Author: Guillaume Nodet <[email protected]> Authored: Tue Feb 10 11:56:10 2015 +0100 Committer: Guillaume Nodet <[email protected]> Committed: Tue Feb 10 11:56:10 2015 +0100 ---------------------------------------------------------------------- .../apache/sshd/server/sftp/SftpSubsystem.java | 134 +++++++++++++++++-- 1 file changed, 120 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/2893dcc4/sshd-core/src/main/java/org/apache/sshd/server/sftp/SftpSubsystem.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/server/sftp/SftpSubsystem.java b/sshd-core/src/main/java/org/apache/sshd/server/sftp/SftpSubsystem.java index 2b8a915..6382aac 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/sftp/SftpSubsystem.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/sftp/SftpSubsystem.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.Calendar; +import java.util.Collection; import java.util.Date; import java.util.EnumSet; import java.util.GregorianCalendar; @@ -34,18 +35,21 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import org.apache.sshd.common.NamedFactory; +import org.apache.sshd.common.file.FileSystemAware; +import org.apache.sshd.common.file.FileSystemView; +import org.apache.sshd.common.file.SshFile; import org.apache.sshd.common.util.Buffer; import org.apache.sshd.common.util.IoUtils; import org.apache.sshd.common.util.SelectorUtils; +import org.apache.sshd.common.util.ThreadUtils; import org.apache.sshd.server.Command; import org.apache.sshd.server.Environment; import org.apache.sshd.server.ExitCallback; -import org.apache.sshd.common.file.FileSystemAware; -import org.apache.sshd.common.file.FileSystemView; import org.apache.sshd.server.SessionAware; -import org.apache.sshd.common.file.SshFile; import org.apache.sshd.server.session.ServerSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,15 +65,55 @@ public class SftpSubsystem implements Command, Runnable, SessionAware, FileSyste public static class Factory implements NamedFactory<Command> { - public Factory() { + public static final String NAME = "sftp"; + + private final ExecutorService executors; + private final boolean shutdownExecutor; + + public Factory() { + this(null); + } + + /** + * @param executorService The {@link ExecutorService} to be used by + * the {@link SftpSubsystem} command when starting execution. If + * {@code null} then a single-threaded ad-hoc service is used. + * <B>Note:</B> the service will <U>not</U> be shutdown when the + * subsystem is closed - unless it is the ad-hoc service, which will be + * shutdown regardless + * @see Factory(ExecutorService, boolean)} + */ + public Factory(ExecutorService executorService) { + this(executorService, false); + } + + /** + * @param executorService The {@link ExecutorService} to be used by + * the {@link SftpSubsystem} command when starting execution. If + * {@code null} then a single-threaded ad-hoc service is used. + * @param shutdownOnExit If {@code true} the {@link ExecutorService#shutdownNow()} + * will be called when subsystem terminates - unless it is the ad-hoc + * service, which will be shutdown regardless + */ + public Factory(ExecutorService executorService, boolean shutdownOnExit) { + executors = executorService; + shutdownExecutor = shutdownOnExit; + } + + public ExecutorService getExecutorService() { + return executors; + } + + public boolean isShutdownOnExit() { + return shutdownExecutor; } public Command create() { - return new SftpSubsystem(); + return new SftpSubsystem(getExecutorService(), isShutdownOnExit()); } public String getName() { - return "sftp"; + return NAME; } } @@ -163,13 +207,13 @@ public class SftpSubsystem implements Command, Runnable, SessionAware, FileSyste private Environment env; private ServerSession session; private boolean closed = false; - + private ExecutorService executors; + private boolean shutdownExecutor; + private Future<?> pendingFuture; private FileSystemView root; - private int version; private Map<String, Handle> handles = new HashMap<String, Handle>(); - protected static abstract class Handle { SshFile file; @@ -184,7 +228,6 @@ public class SftpSubsystem implements Command, Runnable, SessionAware, FileSyste public void close() throws IOException { file.handleClose(); } - } protected static class DirectoryHandle extends Handle implements Iterator<SshFile> { @@ -292,7 +335,39 @@ public class SftpSubsystem implements Command, Runnable, SessionAware, FileSyste } } - public SftpSubsystem() {} + public SftpSubsystem() { + this(null); + } + + /** + * @param executorService The {@link ExecutorService} to be used by + * the {@link SftpSubsystem} command when starting execution. If + * {@code null} then a single-threaded ad-hoc service is used. + * <b>Note:</b> the service will <U>not</U> be shutdown when the + * subsystem is closed - unless it is the ad-hoc service + * @see #SftpSubsystem(ExecutorService, boolean) + */ + public SftpSubsystem(ExecutorService executorService) { + this(executorService, false); + } + + /** + * @param executorService The {@link ExecutorService} to be used by + * the {@link SftpSubsystem} command when starting execution. If + * {@code null} then a single-threaded ad-hoc service is used. + * @param shutdownOnExit If {@code true} the {@link ExecutorService#shutdownNow()} + * will be called when subsystem terminates - unless it is the ad-hoc + * service, which will be shutdown regardless + * @see ThreadUtils#newSingleThreadExecutor(String) + */ + public SftpSubsystem(ExecutorService executorService, boolean shutdownOnExit) { + if ((executors = executorService) == null) { + executors = ThreadUtils.newSingleThreadExecutor(getClass().getSimpleName()); + shutdownExecutor = true; // we always close the ad-hoc executor service + } else { + shutdownExecutor = shutdownOnExit; + } + } public void setSession(ServerSession session) { this.session = session; @@ -320,7 +395,12 @@ public class SftpSubsystem implements Command, Runnable, SessionAware, FileSyste public void start(Environment env) throws IOException { this.env = env; - new Thread(this).start(); + try { + pendingFuture = executors.submit(this); + } catch (RuntimeException e) { // e.g., RejectedExecutionException + log.error("Failed (" + e.getClass().getSimpleName() + ") to start command: " + e.getMessage(), e); + throw new IOException(e); + } } public void run() { @@ -346,7 +426,7 @@ public class SftpSubsystem implements Command, Runnable, SessionAware, FileSyste process(buffer); } } catch (Throwable t) { - if (!closed && !(t instanceof EOFException)) { // Ignore han + if (!closed && !(t instanceof EOFException)) { // Ignore log.error("Exception caught in SFTP subsystem", t); } } finally { @@ -1119,7 +1199,33 @@ public class SftpSubsystem implements Command, Runnable, SessionAware, FileSyste } public void destroy() { - closed = true; + // if thread has not completed, cancel it + if ((pendingFuture != null) && (!pendingFuture.isDone())) { + boolean result = pendingFuture.cancel(true); + // TODO consider waiting some reasonable (?) amount of time for cancellation + if (log.isDebugEnabled()) { + log.debug("destroy() - cancel pending future=" + result); + } + } + + pendingFuture = null; + + if ((executors != null) && shutdownExecutor) { + Collection<Runnable> runners = executors.shutdownNow(); + if (log.isDebugEnabled()) { + log.debug("destroy() - shutdown executor service - runners count=" + ((runners == null) ? 0 : runners.size())); + } + } + + executors = null; + + if (!closed) { + if (log.isDebugEnabled()) { + log.debug("destroy() - mark as closed"); + } + + closed = true; + } } private SshFile resolveFile(String path) {
