Repository: mina-sshd
Updated Branches:
  refs/heads/master faacb6c35 -> 6af9457d3


[SSHD-395] Use an ExecutorService to run ScpCommand(s)

Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/d6ab2b83
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/d6ab2b83
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/d6ab2b83

Branch: refs/heads/master
Commit: d6ab2b83b419a3af3eeeb19e5950d7f9cedb660b
Parents: faacb6c
Author: Guillaume Nodet <[email protected]>
Authored: Tue Feb 10 10:38:13 2015 +0100
Committer: Guillaume Nodet <[email protected]>
Committed: Tue Feb 10 10:38:13 2015 +0100

----------------------------------------------------------------------
 .../apache/sshd/server/command/ScpCommand.java  | 70 +++++++++++++--
 .../sshd/server/command/ScpCommandFactory.java  | 90 ++++++++++++++++++--
 2 files changed, 144 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d6ab2b83/sshd-core/src/main/java/org/apache/sshd/server/command/ScpCommand.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/server/command/ScpCommand.java 
b/sshd-core/src/main/java/org/apache/sshd/server/command/ScpCommand.java
index be5fc91..4355be4 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/command/ScpCommand.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/command/ScpCommand.java
@@ -21,14 +21,15 @@ package org.apache.sshd.server.command;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
 import org.apache.sshd.common.file.FileSystemAware;
 import org.apache.sshd.common.file.FileSystemView;
 import org.apache.sshd.common.scp.ScpHelper;
+import org.apache.sshd.common.util.ThreadUtils;
 import org.apache.sshd.server.Command;
 import org.apache.sshd.server.Environment;
 import org.apache.sshd.server.ExitCallback;
@@ -59,9 +60,39 @@ public class ScpCommand implements Command, Runnable, 
FileSystemAware {
     protected OutputStream err;
     protected ExitCallback callback;
     protected IOException error;
+    protected ExecutorService executors;
+    protected boolean shutdownExecutor;
+    protected Future<?> pendingFuture;
 
     public ScpCommand(String command) {
-        this.name = command;
+        this(command, null);
+    }
+
+    public ScpCommand(String command, ExecutorService executorService) {
+        this(command, executorService, false);
+    }
+
+    /**
+     * @param command The command to be executed
+     * @param executorService An {@link ExecutorService} to be used when
+     *        {@link #start(Environment)}-ing execution. If {@code null} an 
ad-hoc
+     *        single-threaded service is created and used.
+     * @param shutdownOnExit If {@code true} the {@link 
ExecutorService#shutdownNow()}
+     *        will be called when command terminates - unless it is the ad-hoc
+     *        service, which will be shutdown regardless
+     * @see ThreadUtils#newSingleThreadExecutor(String)
+     */
+    public ScpCommand(String command, ExecutorService executorService, boolean 
shutdownOnExit) {
+        name = command;
+
+        if ((executors = executorService) == null) {
+            String poolName = command.replace(' ', '_').replace('/', ':');
+            executors = ThreadUtils.newSingleThreadExecutor(poolName);
+            shutdownExecutor = true;    // we always close the ad-hoc executor 
service
+        } else {
+            shutdownExecutor = shutdownOnExit;
+        }
+
         log.debug("Executing command {}", command);
         String[] args = command.split(" ");
         for (int i = 1; i < args.length; i++) {
@@ -97,7 +128,7 @@ public class ScpCommand implements Command, Runnable, 
FileSystemAware {
             }
         }
         if (!optF && !optT) {
-            error = new IOException("Either -f or -t option should be set");
+            error = new IOException("Either -f or -t option should be set for 
" + command);
         }
     }
 
@@ -125,10 +156,35 @@ public class ScpCommand implements Command, Runnable, 
FileSystemAware {
         if (error != null) {
             throw error;
         }
-        new Thread(this, "ScpCommand: " + name).start();
+
+        try {
+            pendingFuture = executors.submit(this);
+        } catch (RuntimeException e) {    // e.g., RejectedExecutionException
+            log.error("Failed (" + e.getClass().getSimpleName() + ") to start 
command=" + name + ": " + e.getMessage(), e);
+            throw new IOException(e);
+        }
     }
 
     public void destroy() {
+        // if thread has not completed, cancel it
+        if ((pendingFuture != null) && (!pendingFuture.isDone())) {
+            boolean result = pendingFuture.cancel(true);
+            // TODO consider waiting some reasonable (?) amount of time for 
cancellation
+            if (log.isDebugEnabled()) {
+                log.debug("destroy() - cancel pending future=" + result);
+            }
+        }
+
+        pendingFuture = null;
+
+        if ((executors != null) && shutdownExecutor) {
+            Collection<Runnable> runners = executors.shutdownNow();
+            if (log.isDebugEnabled()) {
+                log.debug("destroy() - shutdown executor service - runners 
count=" + ((runners == null) ? 0 : runners.size()));
+            }
+        }
+
+        executors = null;
     }
 
     public void run() {
@@ -154,7 +210,7 @@ public class ScpCommand implements Command, Runnable, 
FileSystemAware {
             } catch (IOException e2) {
                 // Ignore
             }
-            log.info("Error in scp command", e);
+            log.info("Error in scp command=" + name, e);
         } finally {
             if (callback != null) {
                 callback.onExit(exitValue, exitMessage);

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d6ab2b83/sshd-core/src/main/java/org/apache/sshd/server/command/ScpCommandFactory.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/server/command/ScpCommandFactory.java 
b/sshd-core/src/main/java/org/apache/sshd/server/command/ScpCommandFactory.java
index 51eee6c..07ec477 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/server/command/ScpCommandFactory.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/server/command/ScpCommandFactory.java
@@ -18,8 +18,7 @@
  */
 package org.apache.sshd.server.command;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.sshd.server.Command;
 import org.apache.sshd.server.CommandFactory;
@@ -34,14 +33,85 @@ import org.apache.sshd.server.CommandFactory;
  * @author <a href="mailto:[email protected]";>Apache MINA SSHD Project</a>
  */
 public class ScpCommandFactory implements CommandFactory {
+       /**
+        * Command prefix used to identify SCP commands
+        */
+    public static final String SCP_COMMAND_PREFIX = "scp";
 
     private CommandFactory delegate;
+    private ExecutorService executors;
+    private boolean shutdownExecutor;
 
     public ScpCommandFactory() {
+        this(null, null);
     }
 
-    public ScpCommandFactory(CommandFactory delegate) {
-        this.delegate = delegate;
+    /**
+     * @param executorService An {@link ExecutorService} to be used when
+     *        starting {@link ScpCommand} execution. If {@code null} an ad-hoc
+     *        single-threaded service is created and used.
+     */
+    public ScpCommandFactory(ExecutorService executorService) {
+        this(null, executorService);
+    }
+
+    /**
+     * @param delegateFactory A {@link CommandFactory} to be used if the
+     *        command is not an SCP one. If {@code null} then an {@link 
IllegalArgumentException}
+     *        will be thrown when attempting to invoke {@link 
#createCommand(String)}
+     *        with a non-SCP command
+     * @see #SCP_COMMAND_PREFIX
+     */
+    public ScpCommandFactory(CommandFactory delegateFactory) {
+        this(delegateFactory, null);
+    }
+
+    /**
+     * @param delegateFactory A {@link CommandFactory} to be used if the
+     *        command is not an SCP one. If {@code null} then an {@link 
IllegalArgumentException}
+     *        will be thrown when attempting to invoke {@link 
#createCommand(String)}
+     *        with a non-SCP command
+     * @param executorService An {@link ExecutorService} to be used when
+     *        starting {@link ScpCommand} execution. If {@code null} then a 
single-threaded
+     *        ad-hoc service is used. <B>Note:</B> the service will <U>not</U> 
be shutdown
+     *        when the command is terminated - unless it is the ad-hoc 
service, which will be
+     *        shutdown regardless
+     * @see #ScpCommandFactory(CommandFactory, ExecutorService, boolean)
+     */
+    public ScpCommandFactory(CommandFactory delegateFactory, ExecutorService 
executorService) {
+        this(delegateFactory, executorService, false);
+    }
+
+    /**
+     * @param delegateFactory A {@link CommandFactory} to be used if the
+     *        command is not an SCP one. If {@code null} then an {@link 
IllegalArgumentException}
+     *        will be thrown when attempting to invoke {@link 
#createCommand(String)}
+     *        with a non-SCP command
+     * @param executorService An {@link ExecutorService} to be used when
+     *        starting {@link ScpCommand} execution. If {@code null} then a 
single-threaded
+     *        ad-hoc service is used. <B>Note:</B> the service will <U>not</U> 
be shutdown
+     *        when the command is terminated - unless it is the ad-hoc 
service, which will be
+     *        shutdown regardless
+     * @param shutdownOnExit If {@code true} the {@link 
ExecutorService#shutdownNow()}
+     *        will be called when command terminates - unless it is the ad-hoc
+     *        service, which will be shutdown regardless
+     */
+    public ScpCommandFactory(CommandFactory delegateFactory, ExecutorService 
executorService, boolean shutdownOnExit) {
+        delegate = delegateFactory;
+        executors = executorService;
+        shutdownExecutor = shutdownOnExit;
+    }
+
+    public CommandFactory getDelegateCommandFactory() {
+        return delegate;
+    }
+
+    public ExecutorService getExecutorService() {
+        return executors;
+    }
+
+    public boolean isShutdownOnExit() {
+        return shutdownExecutor;
     }
 
     /**
@@ -51,14 +121,17 @@ public class ScpCommandFactory implements CommandFactory {
      *
      * @param command command to parse 
      * @return configured {@link Command} instance
-     * @throws IllegalArgumentException
+     * @throws IllegalArgumentException if not an SCP command and no
+     *         delegate command factory is available
+     * @see #SCP_COMMAND_PREFIX
      */
     public Command createCommand(String command) {
         try {
-            if (!command.startsWith("scp")) {
-                throw new IllegalArgumentException("Unknown command, does not 
begin with 'scp'");
+            if (!command.startsWith(SCP_COMMAND_PREFIX)) {
+                throw new IllegalArgumentException("Unknown command, does not 
begin with '" + SCP_COMMAND_PREFIX + "': " + command);
             }
-            return new ScpCommand(command);
+
+            return new ScpCommand(command, getExecutorService(), 
isShutdownOnExit());
         } catch (IllegalArgumentException iae) {
             if (delegate != null) {
                 return delegate.createCommand(command);
@@ -66,5 +139,4 @@ public class ScpCommandFactory implements CommandFactory {
             throw iae;
         }
     }
-
 }

Reply via email to