Repository: mina-sshd
Updated Branches:
  refs/heads/master 7f88d3a18 -> 2893dcc42


[SSHD-394] Use an ExecutorService to spawn SftpSubsystem command

Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/2893dcc4
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/2893dcc4
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/2893dcc4

Branch: refs/heads/master
Commit: 2893dcc42a38bfde654ecc8028005b41afcf887a
Parents: 7f88d3a
Author: Guillaume Nodet <[email protected]>
Authored: Tue Feb 10 11:56:10 2015 +0100
Committer: Guillaume Nodet <[email protected]>
Committed: Tue Feb 10 11:56:10 2015 +0100

----------------------------------------------------------------------
 .../apache/sshd/server/sftp/SftpSubsystem.java  | 134 +++++++++++++++++--
 1 file changed, 120 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/2893dcc4/sshd-core/src/main/java/org/apache/sshd/server/sftp/SftpSubsystem.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/server/sftp/SftpSubsystem.java 
b/sshd-core/src/main/java/org/apache/sshd/server/sftp/SftpSubsystem.java
index 2b8a915..6382aac 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/sftp/SftpSubsystem.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/sftp/SftpSubsystem.java
@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Calendar;
+import java.util.Collection;
 import java.util.Date;
 import java.util.EnumSet;
 import java.util.GregorianCalendar;
@@ -34,18 +35,21 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
 import org.apache.sshd.common.NamedFactory;
+import org.apache.sshd.common.file.FileSystemAware;
+import org.apache.sshd.common.file.FileSystemView;
+import org.apache.sshd.common.file.SshFile;
 import org.apache.sshd.common.util.Buffer;
 import org.apache.sshd.common.util.IoUtils;
 import org.apache.sshd.common.util.SelectorUtils;
+import org.apache.sshd.common.util.ThreadUtils;
 import org.apache.sshd.server.Command;
 import org.apache.sshd.server.Environment;
 import org.apache.sshd.server.ExitCallback;
-import org.apache.sshd.common.file.FileSystemAware;
-import org.apache.sshd.common.file.FileSystemView;
 import org.apache.sshd.server.SessionAware;
-import org.apache.sshd.common.file.SshFile;
 import org.apache.sshd.server.session.ServerSession;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,15 +65,55 @@ public class SftpSubsystem implements Command, Runnable, 
SessionAware, FileSyste
 
     public static class Factory implements NamedFactory<Command> {
 
-        public Factory() {
+        public static final String NAME = "sftp";
+
+       private final ExecutorService   executors;
+       private final boolean shutdownExecutor;
+
+       public Factory() {
+               this(null);
+       }
+
+        /**
+         * @param executorService The {@link ExecutorService} to be used by
+         *                        the {@link SftpSubsystem} command when 
starting execution. If
+         *                        {@code null} then a single-threaded ad-hoc 
service is used.
+         *                        <B>Note:</B> the service will <U>not</U> be 
shutdown when the
+         *                        subsystem is closed - unless it is the 
ad-hoc service, which will be
+         *                        shutdown regardless
+         * @see Factory(ExecutorService, boolean)}
+         */
+        public Factory(ExecutorService executorService) {
+               this(executorService, false);
+        }
+
+        /**
+         * @param executorService The {@link ExecutorService} to be used by
+         *                        the {@link SftpSubsystem} command when 
starting execution. If
+         *                        {@code null} then a single-threaded ad-hoc 
service is used.
+         * @param shutdownOnExit  If {@code true} the {@link 
ExecutorService#shutdownNow()}
+         *                        will be called when subsystem terminates - 
unless it is the ad-hoc
+         *                        service, which will be shutdown regardless
+         */
+        public Factory(ExecutorService executorService, boolean 
shutdownOnExit) {
+               executors = executorService;
+               shutdownExecutor = shutdownOnExit;
+        }
+
+        public ExecutorService getExecutorService() {
+               return executors;
+        }
+        
+        public boolean isShutdownOnExit() {
+               return shutdownExecutor;
         }
 
         public Command create() {
-            return new SftpSubsystem();
+            return new SftpSubsystem(getExecutorService(), isShutdownOnExit());
         }
 
         public String getName() {
-            return "sftp";
+            return NAME;
         }
     }
 
@@ -163,13 +207,13 @@ public class SftpSubsystem implements Command, Runnable, 
SessionAware, FileSyste
     private Environment env;
     private ServerSession session;
     private boolean closed = false;
-
+       private ExecutorService executors;
+       private boolean shutdownExecutor;
+       private Future<?> pendingFuture;
     private FileSystemView root;
-
     private int version;
     private Map<String, Handle> handles = new HashMap<String, Handle>();
 
-    
     protected static abstract class Handle {
         SshFile file;
 
@@ -184,7 +228,6 @@ public class SftpSubsystem implements Command, Runnable, 
SessionAware, FileSyste
         public void close() throws IOException {
             file.handleClose();
         }
-
     }
 
     protected static class DirectoryHandle extends Handle implements 
Iterator<SshFile> {
@@ -292,7 +335,39 @@ public class SftpSubsystem implements Command, Runnable, 
SessionAware, FileSyste
         }
     }
 
-    public SftpSubsystem() {}
+    public SftpSubsystem() {
+        this(null);
+    }
+
+    /**
+     * @param executorService The {@link ExecutorService} to be used by
+     *                        the {@link SftpSubsystem} command when starting 
execution. If
+     *                        {@code null} then a single-threaded ad-hoc 
service is used.
+     *                        <b>Note:</b> the service will <U>not</U> be 
shutdown when the
+     *                        subsystem is closed - unless it is the ad-hoc 
service
+     * @see #SftpSubsystem(ExecutorService, boolean)
+     */
+    public SftpSubsystem(ExecutorService executorService) {
+        this(executorService, false);
+    }
+
+    /**
+     * @param executorService The {@link ExecutorService} to be used by
+     *                        the {@link SftpSubsystem} command when starting 
execution. If
+     *                        {@code null} then a single-threaded ad-hoc 
service is used.
+     * @param shutdownOnExit  If {@code true} the {@link 
ExecutorService#shutdownNow()}
+     *                        will be called when subsystem terminates - 
unless it is the ad-hoc
+     *                        service, which will be shutdown regardless
+     * @see ThreadUtils#newSingleThreadExecutor(String)
+     */
+    public SftpSubsystem(ExecutorService executorService, boolean 
shutdownOnExit) {
+        if ((executors = executorService) == null) {
+            executors = 
ThreadUtils.newSingleThreadExecutor(getClass().getSimpleName());
+            shutdownExecutor = true;    // we always close the ad-hoc executor 
service
+        } else {
+            shutdownExecutor = shutdownOnExit;
+        }
+    }
 
     public void setSession(ServerSession session) {
         this.session = session;
@@ -320,7 +395,12 @@ public class SftpSubsystem implements Command, Runnable, 
SessionAware, FileSyste
 
     public void start(Environment env) throws IOException {
         this.env = env;
-        new Thread(this).start();
+        try {
+            pendingFuture = executors.submit(this);
+        } catch (RuntimeException e) {    // e.g., RejectedExecutionException
+            log.error("Failed (" + e.getClass().getSimpleName() + ") to start 
command: " + e.getMessage(), e);
+            throw new IOException(e);
+        }
     }
 
     public void run() {
@@ -346,7 +426,7 @@ public class SftpSubsystem implements Command, Runnable, 
SessionAware, FileSyste
                 process(buffer);
             }
         } catch (Throwable t) {
-            if (!closed && !(t instanceof EOFException)) { // Ignore han
+            if (!closed && !(t instanceof EOFException)) { // Ignore
                 log.error("Exception caught in SFTP subsystem", t);
             }
         } finally {
@@ -1119,7 +1199,33 @@ public class SftpSubsystem implements Command, Runnable, 
SessionAware, FileSyste
     }
 
     public void destroy() {
-        closed = true;
+        // if thread has not completed, cancel it
+        if ((pendingFuture != null) && (!pendingFuture.isDone())) {
+            boolean result = pendingFuture.cancel(true);
+            // TODO consider waiting some reasonable (?) amount of time for 
cancellation
+            if (log.isDebugEnabled()) {
+                log.debug("destroy() - cancel pending future=" + result);
+            }
+        }
+
+        pendingFuture = null;
+
+        if ((executors != null) && shutdownExecutor) {
+            Collection<Runnable> runners = executors.shutdownNow();
+            if (log.isDebugEnabled()) {
+                log.debug("destroy() - shutdown executor service - runners 
count=" + ((runners == null) ? 0 : runners.size()));
+            }
+        }
+
+        executors = null;
+
+        if (!closed) {
+            if (log.isDebugEnabled()) {
+                log.debug("destroy() - mark as closed");
+            }
+
+            closed = true;
+        }
     }
 
     private SshFile resolveFile(String path) {

Reply via email to