Repository: mina-sshd Updated Branches: refs/heads/master faacb6c35 -> 6af9457d3
[SSHD-395] Use an ExecutorService to run ScpCommand(s) Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/d6ab2b83 Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/d6ab2b83 Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/d6ab2b83 Branch: refs/heads/master Commit: d6ab2b83b419a3af3eeeb19e5950d7f9cedb660b Parents: faacb6c Author: Guillaume Nodet <[email protected]> Authored: Tue Feb 10 10:38:13 2015 +0100 Committer: Guillaume Nodet <[email protected]> Committed: Tue Feb 10 10:38:13 2015 +0100 ---------------------------------------------------------------------- .../apache/sshd/server/command/ScpCommand.java | 70 +++++++++++++-- .../sshd/server/command/ScpCommandFactory.java | 90 ++++++++++++++++++-- 2 files changed, 144 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d6ab2b83/sshd-core/src/main/java/org/apache/sshd/server/command/ScpCommand.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/server/command/ScpCommand.java b/sshd-core/src/main/java/org/apache/sshd/server/command/ScpCommand.java index be5fc91..4355be4 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/command/ScpCommand.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/command/ScpCommand.java @@ -21,14 +21,15 @@ package org.apache.sshd.server.command; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Arrays; +import java.util.Collection; import java.util.Collections; -import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import org.apache.sshd.common.file.FileSystemAware; import org.apache.sshd.common.file.FileSystemView; import org.apache.sshd.common.scp.ScpHelper; +import org.apache.sshd.common.util.ThreadUtils; import org.apache.sshd.server.Command; import org.apache.sshd.server.Environment; import org.apache.sshd.server.ExitCallback; @@ -59,9 +60,39 @@ public class ScpCommand implements Command, Runnable, FileSystemAware { protected OutputStream err; protected ExitCallback callback; protected IOException error; + protected ExecutorService executors; + protected boolean shutdownExecutor; + protected Future<?> pendingFuture; public ScpCommand(String command) { - this.name = command; + this(command, null); + } + + public ScpCommand(String command, ExecutorService executorService) { + this(command, executorService, false); + } + + /** + * @param command The command to be executed + * @param executorService An {@link ExecutorService} to be used when + * {@link #start(Environment)}-ing execution. If {@code null} an ad-hoc + * single-threaded service is created and used. + * @param shutdownOnExit If {@code true} the {@link ExecutorService#shutdownNow()} + * will be called when command terminates - unless it is the ad-hoc + * service, which will be shutdown regardless + * @see ThreadUtils#newSingleThreadExecutor(String) + */ + public ScpCommand(String command, ExecutorService executorService, boolean shutdownOnExit) { + name = command; + + if ((executors = executorService) == null) { + String poolName = command.replace(' ', '_').replace('/', ':'); + executors = ThreadUtils.newSingleThreadExecutor(poolName); + shutdownExecutor = true; // we always close the ad-hoc executor service + } else { + shutdownExecutor = shutdownOnExit; + } + log.debug("Executing command {}", command); String[] args = command.split(" "); for (int i = 1; i < args.length; i++) { @@ -97,7 +128,7 @@ public class ScpCommand implements Command, Runnable, FileSystemAware { } } if (!optF && !optT) { - error = new IOException("Either -f or -t option should be set"); + error = new IOException("Either -f or -t option should be set for " + command); } } @@ -125,10 +156,35 @@ public class ScpCommand implements Command, Runnable, FileSystemAware { if (error != null) { throw error; } - new Thread(this, "ScpCommand: " + name).start(); + + try { + pendingFuture = executors.submit(this); + } catch (RuntimeException e) { // e.g., RejectedExecutionException + log.error("Failed (" + e.getClass().getSimpleName() + ") to start command=" + name + ": " + e.getMessage(), e); + throw new IOException(e); + } } public void destroy() { + // if thread has not completed, cancel it + if ((pendingFuture != null) && (!pendingFuture.isDone())) { + boolean result = pendingFuture.cancel(true); + // TODO consider waiting some reasonable (?) amount of time for cancellation + if (log.isDebugEnabled()) { + log.debug("destroy() - cancel pending future=" + result); + } + } + + pendingFuture = null; + + if ((executors != null) && shutdownExecutor) { + Collection<Runnable> runners = executors.shutdownNow(); + if (log.isDebugEnabled()) { + log.debug("destroy() - shutdown executor service - runners count=" + ((runners == null) ? 0 : runners.size())); + } + } + + executors = null; } public void run() { @@ -154,7 +210,7 @@ public class ScpCommand implements Command, Runnable, FileSystemAware { } catch (IOException e2) { // Ignore } - log.info("Error in scp command", e); + log.info("Error in scp command=" + name, e); } finally { if (callback != null) { callback.onExit(exitValue, exitMessage); http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d6ab2b83/sshd-core/src/main/java/org/apache/sshd/server/command/ScpCommandFactory.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/server/command/ScpCommandFactory.java b/sshd-core/src/main/java/org/apache/sshd/server/command/ScpCommandFactory.java index 51eee6c..07ec477 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/command/ScpCommandFactory.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/command/ScpCommandFactory.java @@ -18,8 +18,7 @@ */ package org.apache.sshd.server.command; -import java.util.ArrayList; -import java.util.List; +import java.util.concurrent.ExecutorService; import org.apache.sshd.server.Command; import org.apache.sshd.server.CommandFactory; @@ -34,14 +33,85 @@ import org.apache.sshd.server.CommandFactory; * @author <a href="mailto:[email protected]">Apache MINA SSHD Project</a> */ public class ScpCommandFactory implements CommandFactory { + /** + * Command prefix used to identify SCP commands + */ + public static final String SCP_COMMAND_PREFIX = "scp"; private CommandFactory delegate; + private ExecutorService executors; + private boolean shutdownExecutor; public ScpCommandFactory() { + this(null, null); } - public ScpCommandFactory(CommandFactory delegate) { - this.delegate = delegate; + /** + * @param executorService An {@link ExecutorService} to be used when + * starting {@link ScpCommand} execution. If {@code null} an ad-hoc + * single-threaded service is created and used. + */ + public ScpCommandFactory(ExecutorService executorService) { + this(null, executorService); + } + + /** + * @param delegateFactory A {@link CommandFactory} to be used if the + * command is not an SCP one. If {@code null} then an {@link IllegalArgumentException} + * will be thrown when attempting to invoke {@link #createCommand(String)} + * with a non-SCP command + * @see #SCP_COMMAND_PREFIX + */ + public ScpCommandFactory(CommandFactory delegateFactory) { + this(delegateFactory, null); + } + + /** + * @param delegateFactory A {@link CommandFactory} to be used if the + * command is not an SCP one. If {@code null} then an {@link IllegalArgumentException} + * will be thrown when attempting to invoke {@link #createCommand(String)} + * with a non-SCP command + * @param executorService An {@link ExecutorService} to be used when + * starting {@link ScpCommand} execution. If {@code null} then a single-threaded + * ad-hoc service is used. <B>Note:</B> the service will <U>not</U> be shutdown + * when the command is terminated - unless it is the ad-hoc service, which will be + * shutdown regardless + * @see #ScpCommandFactory(CommandFactory, ExecutorService, boolean) + */ + public ScpCommandFactory(CommandFactory delegateFactory, ExecutorService executorService) { + this(delegateFactory, executorService, false); + } + + /** + * @param delegateFactory A {@link CommandFactory} to be used if the + * command is not an SCP one. If {@code null} then an {@link IllegalArgumentException} + * will be thrown when attempting to invoke {@link #createCommand(String)} + * with a non-SCP command + * @param executorService An {@link ExecutorService} to be used when + * starting {@link ScpCommand} execution. If {@code null} then a single-threaded + * ad-hoc service is used. <B>Note:</B> the service will <U>not</U> be shutdown + * when the command is terminated - unless it is the ad-hoc service, which will be + * shutdown regardless + * @param shutdownOnExit If {@code true} the {@link ExecutorService#shutdownNow()} + * will be called when command terminates - unless it is the ad-hoc + * service, which will be shutdown regardless + */ + public ScpCommandFactory(CommandFactory delegateFactory, ExecutorService executorService, boolean shutdownOnExit) { + delegate = delegateFactory; + executors = executorService; + shutdownExecutor = shutdownOnExit; + } + + public CommandFactory getDelegateCommandFactory() { + return delegate; + } + + public ExecutorService getExecutorService() { + return executors; + } + + public boolean isShutdownOnExit() { + return shutdownExecutor; } /** @@ -51,14 +121,17 @@ public class ScpCommandFactory implements CommandFactory { * * @param command command to parse * @return configured {@link Command} instance - * @throws IllegalArgumentException + * @throws IllegalArgumentException if not an SCP command and no + * delegate command factory is available + * @see #SCP_COMMAND_PREFIX */ public Command createCommand(String command) { try { - if (!command.startsWith("scp")) { - throw new IllegalArgumentException("Unknown command, does not begin with 'scp'"); + if (!command.startsWith(SCP_COMMAND_PREFIX)) { + throw new IllegalArgumentException("Unknown command, does not begin with '" + SCP_COMMAND_PREFIX + "': " + command); } - return new ScpCommand(command); + + return new ScpCommand(command, getExecutorService(), isShutdownOnExit()); } catch (IllegalArgumentException iae) { if (delegate != null) { return delegate.createCommand(command); @@ -66,5 +139,4 @@ public class ScpCommandFactory implements CommandFactory { throw iae; } } - }
