[SSHD-835] Introduce a Closeable ExecutorService
Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/7b35bb36 Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/7b35bb36 Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/7b35bb36 Branch: refs/heads/master Commit: 7b35bb360410f21be06ad16a7d0a0a0d7cc012af Parents: 101a509 Author: Guillaume Nodet <gno...@apache.org> Authored: Thu May 31 09:51:49 2018 +0200 Committer: Guillaume Nodet <gno...@apache.org> Committed: Wed Jul 25 14:37:52 2018 +0200 ---------------------------------------------------------------------- .../sshd/agent/common/AbstractAgentProxy.java | 27 +- .../sshd/agent/local/AgentForwardedChannel.java | 2 +- .../agent/local/ChannelAgentForwarding.java | 6 +- .../local/ChannelAgentForwardingFactory.java | 2 +- .../org/apache/sshd/agent/unix/AgentClient.java | 12 +- .../org/apache/sshd/agent/unix/AgentServer.java | 22 +- .../sshd/agent/unix/AgentServerProxy.java | 23 +- .../sshd/agent/unix/ChannelAgentForwarding.java | 16 +- .../unix/ChannelAgentForwardingFactory.java | 29 +- .../sshd/agent/unix/UnixAgentFactory.java | 59 ++-- .../client/channel/AbstractClientChannel.java | 6 + .../sshd/client/channel/ChannelSession.java | 10 +- .../client/session/AbstractClientSession.java | 4 +- .../client/session/ClientConnectionService.java | 9 +- .../sshd/common/channel/AbstractChannel.java | 31 +-- .../common/io/AbstractIoServiceFactory.java | 18 +- .../io/AbstractIoServiceFactoryFactory.java | 45 +-- .../io/DefaultIoServiceFactoryFactory.java | 17 +- .../sshd/common/io/IoServiceFactoryFactory.java | 5 +- .../sshd/common/io/nio2/Nio2ServiceFactory.java | 19 +- .../io/nio2/Nio2ServiceFactoryFactory.java | 14 +- .../helpers/AbstractConnectionService.java | 38 ++- .../common/util/threads/ExecutorService.java | 26 ++ .../util/threads/ExecutorServiceCarrier.java | 8 - .../util/threads/ExecutorServiceConfigurer.java | 31 --- .../sshd/common/util/threads/ThreadUtils.java | 276 ++++++++++++++++--- .../server/channel/AbstractServerChannel.java | 17 +- .../sshd/server/channel/ChannelSession.java | 9 +- .../server/command/AbstractCommandSupport.java | 14 +- .../command/AbstractFileSystemCommand.java | 6 +- .../sshd/server/forward/DirectTcpipFactory.java | 1 + .../server/forward/ForwardedTcpipFactory.java | 1 + .../sshd/server/forward/TcpipServerChannel.java | 43 +-- .../server/session/ServerConnectionService.java | 8 +- .../forward/AbstractServerCloseTestSupport.java | 15 +- .../io/DefaultIoServiceFactoryFactoryTest.java | 22 +- .../sshd/common/util/ThreadUtilsTest.java | 5 +- .../java/org/apache/sshd/server/ServerTest.java | 2 +- .../sshd/util/test/CommandExecutionHelper.java | 2 +- .../org/apache/sshd/git/AbstractGitCommand.java | 6 +- .../sshd/git/AbstractGitCommandFactory.java | 14 +- .../apache/sshd/git/pack/GitPackCommand.java | 8 +- .../sshd/git/pack/GitPackCommandFactory.java | 10 +- .../org/apache/sshd/git/pgm/GitPgmCommand.java | 8 +- .../sshd/git/pgm/GitPgmCommandFactory.java | 10 +- .../sshd/common/io/mina/MinaServiceFactory.java | 9 +- .../io/mina/MinaServiceFactoryFactory.java | 21 +- .../netty/NettyIoServiceFactoryFactory.java | 5 +- .../sshd/client/scp/AbstractScpClient.java | 4 + .../org/apache/sshd/server/scp/ScpCommand.java | 9 +- .../sshd/server/scp/ScpCommandFactory.java | 26 +- .../org/apache/sshd/client/scp/ScpTest.java | 8 +- .../sshd/server/scp/ScpCommandFactoryTest.java | 6 +- .../server/subsystem/sftp/SftpSubsystem.java | 17 +- .../subsystem/sftp/SftpSubsystemFactory.java | 33 +-- .../client/subsystem/sftp/SftpVersionsTest.java | 4 +- .../SpaceAvailableExtensionImplTest.java | 2 +- .../openssh/helpers/OpenSSHExtensionsTest.java | 2 +- .../sftp/SftpSubsystemFactoryTest.java | 6 +- 59 files changed, 556 insertions(+), 552 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/agent/common/AbstractAgentProxy.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/common/AbstractAgentProxy.java b/sshd-core/src/main/java/org/apache/sshd/agent/common/AbstractAgentProxy.java index 028b48e..0432eec 100644 --- a/sshd-core/src/main/java/org/apache/sshd/agent/common/AbstractAgentProxy.java +++ b/sshd-core/src/main/java/org/apache/sshd/agent/common/AbstractAgentProxy.java @@ -26,7 +26,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutorService; import org.apache.sshd.agent.SshAgent; import org.apache.sshd.agent.SshAgentConstants; @@ -38,19 +37,20 @@ import org.apache.sshd.common.util.buffer.Buffer; import org.apache.sshd.common.util.buffer.BufferUtils; import org.apache.sshd.common.util.buffer.ByteArrayBuffer; import org.apache.sshd.common.util.logging.AbstractLoggingBean; -import org.apache.sshd.common.util.threads.ExecutorServiceConfigurer; +import org.apache.sshd.common.util.threads.ExecutorService; +import org.apache.sshd.common.util.threads.ExecutorServiceCarrier; /** * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> */ -public abstract class AbstractAgentProxy extends AbstractLoggingBean implements SshAgent, ExecutorServiceConfigurer { +public abstract class AbstractAgentProxy extends AbstractLoggingBean implements SshAgent, ExecutorServiceCarrier { private ExecutorService executor; - private boolean shutdownExecutor; private String channelType = FactoryManager.AGENT_FORWARDING_TYPE_OPENSSH; - protected AbstractAgentProxy() { + protected AbstractAgentProxy(ExecutorService executorService) { super(); + executor = executorService; } public String getChannelType() { @@ -67,21 +67,6 @@ public abstract class AbstractAgentProxy extends AbstractLoggingBean implements } @Override - public void setExecutorService(ExecutorService service) { - executor = service; - } - - @Override - public boolean isShutdownOnExit() { - return shutdownExecutor; - } - - @Override - public void setShutdownOnExit(boolean shutdown) { - shutdownExecutor = shutdown; - } - - @Override public List<? extends Map.Entry<PublicKey, String>> getIdentities() throws IOException { int cmd = SshAgentConstants.SSH2_AGENTC_REQUEST_IDENTITIES; int okcmd = SshAgentConstants.SSH2_AGENT_IDENTITIES_ANSWER; @@ -213,7 +198,7 @@ public abstract class AbstractAgentProxy extends AbstractLoggingBean implements @Override public void close() throws IOException { ExecutorService service = getExecutorService(); - if ((service != null) && isShutdownOnExit() && (!service.isShutdown())) { + if ((service != null) && (!service.isShutdown())) { Collection<?> runners = service.shutdownNow(); if (log.isDebugEnabled()) { log.debug("close() - shutdown runners count=" + GenericUtils.size(runners)); http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java b/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java index 9306bc8..7657937 100644 --- a/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java @@ -46,7 +46,7 @@ public class AgentForwardedChannel extends AbstractClientChannel { } public SshAgent getAgent() { - AbstractAgentProxy rtn = new AbstractAgentProxy() { + AbstractAgentProxy rtn = new AbstractAgentProxy(null) { private final AtomicBoolean open = new AtomicBoolean(true); @Override http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java b/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java index a1868c7..e051f1e 100644 --- a/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java +++ b/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java @@ -20,6 +20,7 @@ package org.apache.sshd.agent.local; import java.io.IOException; import java.io.OutputStream; +import java.util.Collections; import java.util.Objects; import org.apache.sshd.agent.SshAgent; @@ -36,6 +37,7 @@ import org.apache.sshd.common.util.GenericUtils; import org.apache.sshd.common.util.ValidateUtils; import org.apache.sshd.common.util.buffer.Buffer; import org.apache.sshd.common.util.buffer.ByteArrayBuffer; +import org.apache.sshd.common.util.threads.ExecutorService; import org.apache.sshd.server.channel.AbstractServerChannel; /** @@ -46,8 +48,8 @@ public class ChannelAgentForwarding extends AbstractServerChannel { private SshAgent agent; private AgentClient client; - public ChannelAgentForwarding() { - super(); + public ChannelAgentForwarding(ExecutorService executor) { + super("", Collections.emptyList(), executor); } @Override http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwardingFactory.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwardingFactory.java b/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwardingFactory.java index 1396b61..40f9135 100644 --- a/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwardingFactory.java +++ b/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwardingFactory.java @@ -44,6 +44,6 @@ public class ChannelAgentForwardingFactory implements ChannelFactory { @Override public Channel create() { - return new ChannelAgentForwarding(); + return new ChannelAgentForwarding(null); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentClient.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentClient.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentClient.java index 97acd90..7f80a89 100644 --- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentClient.java +++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentClient.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.Queue; import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; @@ -30,6 +29,7 @@ import org.apache.sshd.agent.common.AbstractAgentProxy; import org.apache.sshd.common.SshException; import org.apache.sshd.common.util.buffer.Buffer; import org.apache.sshd.common.util.buffer.ByteArrayBuffer; +import org.apache.sshd.common.util.threads.ExecutorService; import org.apache.sshd.common.util.threads.ThreadUtils; import org.apache.tomcat.jni.Local; import org.apache.tomcat.jni.Pool; @@ -50,15 +50,13 @@ public class AgentClient extends AbstractAgentProxy implements Runnable { private final AtomicBoolean open = new AtomicBoolean(true); public AgentClient(String authSocket) throws IOException { - this(authSocket, null, false); + this(authSocket, null); } - public AgentClient(String authSocket, ExecutorService executor, boolean shutdownOnExit) throws IOException { + public AgentClient(String authSocket, ExecutorService executor) throws IOException { + super((executor == null) ? ThreadUtils.newSingleThreadExecutor("AgentClient[" + authSocket + "]") : executor); this.authSocket = authSocket; - setExecutorService((executor == null) ? ThreadUtils.newSingleThreadExecutor("AgentClient[" + authSocket + "]") : executor); - setShutdownOnExit((executor == null) || shutdownOnExit); - try { pool = Pool.create(AprLibrary.getInstance().getRootPool()); handle = Local.create(authSocket, pool); @@ -141,7 +139,7 @@ public class AgentClient extends AbstractAgentProxy implements Runnable { Socket.close(handle); } - if ((pumper != null) && isShutdownOnExit() && (!pumper.isDone())) { + if ((pumper != null) && (!pumper.isDone())) { pumper.cancel(true); } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServer.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServer.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServer.java index 113ea23..6c6e13a 100644 --- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServer.java +++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServer.java @@ -21,7 +21,6 @@ package org.apache.sshd.agent.unix; import java.io.Closeable; import java.io.IOException; import java.util.Collection; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import org.apache.sshd.agent.SshAgent; @@ -31,6 +30,7 @@ import org.apache.sshd.common.util.GenericUtils; import org.apache.sshd.common.util.buffer.Buffer; import org.apache.sshd.common.util.buffer.ByteArrayBuffer; import org.apache.sshd.common.util.logging.AbstractLoggingBean; +import org.apache.sshd.common.util.threads.ExecutorService; import org.apache.sshd.common.util.threads.ExecutorServiceCarrier; import org.apache.sshd.common.util.threads.ThreadUtils; import org.apache.tomcat.jni.Local; @@ -46,24 +46,23 @@ public class AgentServer extends AbstractLoggingBean implements Closeable, Execu private final SshAgent agent; private final ExecutorService service; - private final boolean shutdownExecutor; private Future<?> agentThread; private String authSocket; private long pool; private long handle; public AgentServer() { - this(null, false); + this(null); } - public AgentServer(ExecutorService executor, boolean shutdownOnExit) { - this(new AgentImpl(), executor, shutdownOnExit); + public AgentServer(ExecutorService executor) { + this(new AgentImpl(), executor); } - public AgentServer(SshAgent agent, ExecutorService executor, boolean shutdownOnExit) { + public AgentServer(SshAgent agent, ExecutorService executor) { this.agent = agent; - this.service = (executor == null) ? ThreadUtils.newSingleThreadExecutor("AgentServer[" + agent + "]") : executor; - this.shutdownExecutor = service != executor || shutdownOnExit; + this.service = (executor == null) + ? ThreadUtils.newSingleThreadExecutor("AgentServer[" + agent + "]") : executor; } public SshAgent getAgent() { @@ -75,11 +74,6 @@ public class AgentServer extends AbstractLoggingBean implements Closeable, Execu return service; } - @Override - public boolean isShutdownOnExit() { - return shutdownExecutor; - } - public String start() throws Exception { authSocket = AprLibrary.createLocalSocketAddress(); pool = Pool.create(AprLibrary.getInstance().getRootPool()); @@ -129,7 +123,7 @@ public class AgentServer extends AbstractLoggingBean implements Closeable, Execu } ExecutorService executor = getExecutorService(); - if ((executor != null) && isShutdownOnExit() && (!executor.isShutdown())) { + if ((executor != null) && (!executor.isShutdown())) { Collection<?> runners = executor.shutdownNow(); if (log.isDebugEnabled()) { log.debug("Shut down runners count=" + GenericUtils.size(runners)); http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServerProxy.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServerProxy.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServerProxy.java index 5b66e57..4401a0d 100644 --- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServerProxy.java +++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServerProxy.java @@ -22,7 +22,6 @@ import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Collection; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; @@ -33,7 +32,7 @@ import org.apache.sshd.common.session.Session; import org.apache.sshd.common.util.GenericUtils; import org.apache.sshd.common.util.OsUtils; import org.apache.sshd.common.util.logging.AbstractLoggingBean; -import org.apache.sshd.common.util.threads.ExecutorServiceCarrier; +import org.apache.sshd.common.util.threads.ExecutorService; import org.apache.sshd.common.util.threads.ThreadUtils; import org.apache.tomcat.jni.Local; import org.apache.tomcat.jni.Pool; @@ -43,7 +42,7 @@ import org.apache.tomcat.jni.Status; /** * The server side fake agent, acting as an agent, but actually forwarding the requests to the auth channel on the client side. */ -public class AgentServerProxy extends AbstractLoggingBean implements SshAgentServer, ExecutorServiceCarrier { +public class AgentServerProxy extends AbstractLoggingBean implements SshAgentServer { /** * Property that can be set on the {@link Session} in order to control * the authentication timeout (millis). If not specified then @@ -61,15 +60,14 @@ public class AgentServerProxy extends AbstractLoggingBean implements SshAgentSer private final long handle; private Future<?> piper; private final ExecutorService pipeService; - private final boolean pipeCloseOnExit; private final AtomicBoolean open = new AtomicBoolean(true); private final AtomicBoolean innerFinished = new AtomicBoolean(false); public AgentServerProxy(ConnectionService service) throws IOException { - this(service, null, false); + this(service, null); } - public AgentServerProxy(ConnectionService service, ExecutorService executor, boolean shutdownOnExit) throws IOException { + public AgentServerProxy(ConnectionService service, ExecutorService executor) throws IOException { this.service = service; try { String authSocket = AprLibrary.createLocalSocketAddress(); @@ -89,8 +87,9 @@ public class AgentServerProxy extends AbstractLoggingBean implements SshAgentSer throw toIOException(result); } - pipeService = (executor == null) ? ThreadUtils.newSingleThreadExecutor("sshd-AgentServerProxy-PIPE-" + authSocket) : executor; - pipeCloseOnExit = executor != pipeService || shutdownOnExit; + pipeService = (executor == null) + ? ThreadUtils.newSingleThreadExecutor("sshd-AgentServerProxy-PIPE-" + authSocket) + : ThreadUtils.noClose(executor); piper = pipeService.submit(() -> { try { boolean debugEnabled = log.isDebugEnabled(); @@ -134,17 +133,11 @@ public class AgentServerProxy extends AbstractLoggingBean implements SshAgentSer return open.get(); } - @Override public ExecutorService getExecutorService() { return pipeService; } @Override - public boolean isShutdownOnExit() { - return pipeCloseOnExit; - } - - @Override public String getId() { return authSocket; } @@ -194,7 +187,7 @@ public class AgentServerProxy extends AbstractLoggingBean implements SshAgentSer } ExecutorService executor = getExecutorService(); - if ((executor != null) && isShutdownOnExit() && (!executor.isShutdown())) { + if ((executor != null) && (!executor.isShutdown())) { Collection<?> runners = executor.shutdownNow(); if (debugEnabled) { log.debug("Shut down runners count=" + GenericUtils.size(runners)); http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java index 395bd37..586faae 100644 --- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java +++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java @@ -21,7 +21,7 @@ package org.apache.sshd.agent.unix; import java.io.IOException; import java.io.OutputStream; import java.util.Collection; -import java.util.concurrent.ExecutorService; +import java.util.Collections; import java.util.concurrent.Future; import org.apache.sshd.agent.SshAgent; @@ -33,6 +33,7 @@ import org.apache.sshd.common.future.CloseFuture; import org.apache.sshd.common.util.GenericUtils; import org.apache.sshd.common.util.ValidateUtils; import org.apache.sshd.common.util.buffer.Buffer; +import org.apache.sshd.common.util.threads.ExecutorService; import org.apache.sshd.common.util.threads.ThreadUtils; import org.apache.sshd.server.channel.AbstractServerChannel; import org.apache.tomcat.jni.Local; @@ -63,10 +64,9 @@ public class ChannelAgentForwarding extends AbstractServerChannel { private OutputStream out; private ExecutorService forwardService; private Future<?> forwarder; - private boolean shutdownForwarder; - public ChannelAgentForwarding() { - super(); + public ChannelAgentForwarding(ExecutorService executor) { + super("", Collections.emptyList(), executor); } @Override @@ -83,8 +83,9 @@ public class ChannelAgentForwarding extends AbstractServerChannel { } ExecutorService service = getExecutorService(); - forwardService = (service == null) ? ThreadUtils.newSingleThreadExecutor("ChannelAgentForwarding[" + authSocket + "]") : service; - shutdownForwarder = service != forwardService || isShutdownOnExit(); + forwardService = (service == null) + ? ThreadUtils.newSingleThreadExecutor("ChannelAgentForwarding[" + authSocket + "]") + : ThreadUtils.noClose(service); final int copyBufSize = this.getIntProperty(FORWARDER_BUFFER_SIZE, DEFAULT_FORWARDER_BUF_SIZE); ValidateUtils.checkTrue(copyBufSize >= MIN_FORWARDER_BUF_SIZE, "Copy buf size below min.: %d", copyBufSize); @@ -136,7 +137,7 @@ public class ChannelAgentForwarding extends AbstractServerChannel { } try { - if ((forwardService != null) && shutdownForwarder) { + if (forwardService != null) { Collection<?> runners = forwardService.shutdownNow(); if (log.isDebugEnabled()) { log.debug("Shut down runners count=" + GenericUtils.size(runners)); @@ -144,7 +145,6 @@ public class ChannelAgentForwarding extends AbstractServerChannel { } } finally { forwardService = null; - shutdownForwarder = false; } } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwardingFactory.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwardingFactory.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwardingFactory.java index 4f31f41..e4d37d8 100644 --- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwardingFactory.java +++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwardingFactory.java @@ -18,25 +18,31 @@ */ package org.apache.sshd.agent.unix; -import java.util.concurrent.ExecutorService; - +import org.apache.sshd.common.Factory; import org.apache.sshd.common.channel.Channel; import org.apache.sshd.common.channel.ChannelFactory; import org.apache.sshd.common.util.ValidateUtils; -import org.apache.sshd.common.util.threads.ExecutorServiceCarrier; +import org.apache.sshd.common.util.threads.ExecutorService; /** * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> */ -public class ChannelAgentForwardingFactory implements ChannelFactory, ExecutorServiceCarrier { +public class ChannelAgentForwardingFactory implements ChannelFactory { + public static final ChannelAgentForwardingFactory OPENSSH = new ChannelAgentForwardingFactory("auth-ag...@openssh.com"); // see https://tools.ietf.org/html/draft-ietf-secsh-agent-02 public static final ChannelAgentForwardingFactory IETF = new ChannelAgentForwardingFactory("auth-agent"); private final String name; + private final Factory<ExecutorService> executorServiceFactory; public ChannelAgentForwardingFactory(String name) { + this(name, null); + } + + public ChannelAgentForwardingFactory(String name, Factory<ExecutorService> executorServiceFactory) { this.name = ValidateUtils.checkNotNullAndNotEmpty(name, "No channel factory name specified"); + this.executorServiceFactory = executorServiceFactory; } @Override @@ -44,21 +50,10 @@ public class ChannelAgentForwardingFactory implements ChannelFactory, ExecutorSe return name; } - @Override // user can override to provide an alternative - public ExecutorService getExecutorService() { - return null; - } - - @Override - public boolean isShutdownOnExit() { - return false; - } - @Override public Channel create() { - ChannelAgentForwarding channel = new ChannelAgentForwarding(); - channel.setExecutorService(getExecutorService()); - channel.setShutdownOnExit(isShutdownOnExit()); + ExecutorService executorService = executorServiceFactory != null ? executorServiceFactory.create() : null; + ChannelAgentForwarding channel = new ChannelAgentForwarding(executorService); return channel; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/agent/unix/UnixAgentFactory.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/UnixAgentFactory.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/UnixAgentFactory.java index 128214b..02c4f62 100644 --- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/UnixAgentFactory.java +++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/UnixAgentFactory.java @@ -23,12 +23,12 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Objects; -import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import org.apache.sshd.agent.SshAgent; import org.apache.sshd.agent.SshAgentFactory; import org.apache.sshd.agent.SshAgentServer; +import org.apache.sshd.common.Factory; import org.apache.sshd.common.FactoryManager; import org.apache.sshd.common.NamedFactory; import org.apache.sshd.common.SshException; @@ -37,65 +37,40 @@ import org.apache.sshd.common.session.ConnectionService; import org.apache.sshd.common.session.Session; import org.apache.sshd.common.util.GenericUtils; import org.apache.sshd.common.util.ValidateUtils; -import org.apache.sshd.common.util.threads.ExecutorServiceConfigurer; +import org.apache.sshd.common.util.threads.ExecutorService; import org.apache.sshd.server.session.ServerSession; /** * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> */ -public class UnixAgentFactory implements SshAgentFactory, ExecutorServiceConfigurer { +public class UnixAgentFactory implements SshAgentFactory { public static final List<NamedFactory<Channel>> DEFAULT_FORWARDING_CHANNELS = Collections.unmodifiableList( Arrays.<NamedFactory<Channel>>asList(ChannelAgentForwardingFactory.OPENSSH, ChannelAgentForwardingFactory.IETF)); - private ExecutorService executor; - private boolean shutdownExecutor; + private Factory<ExecutorService> executorServiceFactory; public UnixAgentFactory() { super(); } - public UnixAgentFactory(ExecutorService service, boolean shutdown) { - executor = service; - shutdownExecutor = shutdown; + public UnixAgentFactory(Factory<ExecutorService> factory) { + executorServiceFactory = factory; } - @Override - public ExecutorService getExecutorService() { - return executor; - } - - @Override - public void setExecutorService(ExecutorService service) { - executor = service; - } - - @Override - public boolean isShutdownOnExit() { - return shutdownExecutor; - } - - @Override - public void setShutdownOnExit(boolean shutdown) { - shutdownExecutor = shutdown; + protected ExecutorService newExecutor() { + return executorServiceFactory != null ? executorServiceFactory.create() : null; } @Override public List<NamedFactory<Channel>> getChannelForwardingFactories(FactoryManager manager) { - final ExecutorServiceConfigurer configurer = this; - return Collections.unmodifiableList(DEFAULT_FORWARDING_CHANNELS.stream() - .map(cf -> new ChannelAgentForwardingFactory(cf.getName()) { - @Override - public ExecutorService getExecutorService() { - return configurer.getExecutorService(); - } - - @Override - public boolean isShutdownOnExit() { - return configurer.isShutdownOnExit(); - } - }) - .collect(Collectors.toList())); + if (executorServiceFactory != null) { + return DEFAULT_FORWARDING_CHANNELS.stream() + .map(cf -> new ChannelAgentForwardingFactory(cf.getName(), executorServiceFactory)) + .collect(Collectors.toList()); + } else { + return DEFAULT_FORWARDING_CHANNELS; + } } @Override @@ -105,7 +80,7 @@ public class UnixAgentFactory implements SshAgentFactory, ExecutorServiceConfigu throw new SshException("No " + SshAgent.SSH_AUTHSOCKET_ENV_NAME + " value"); } - return new AgentClient(authSocket, getExecutorService(), isShutdownOnExit()); + return new AgentClient(authSocket, newExecutor()); } @Override @@ -113,6 +88,6 @@ public class UnixAgentFactory implements SshAgentFactory, ExecutorServiceConfigu Session session = Objects.requireNonNull(service.getSession(), "No session"); ValidateUtils.checkInstanceOf(session, ServerSession.class, "The session used to create an agent server proxy must be a server session: %s", session); - return new AgentServerProxy(service, getExecutorService(), isShutdownOnExit()); + return new AgentServerProxy(service, newExecutor()); } } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java index 78a6e12..1ed4518 100644 --- a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java @@ -101,6 +101,12 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C }); } +// TODO: investigate how to fix the forwarding channel failures when enabled +// @Override +// public ClientSession getSession() { +// return (ClientSession) super.getSession(); +// } + protected void addChannelSignalRequestHandlers(EventNotifier<String> notifier) { addRequestHandler(new ExitStatusChannelRequestHandler(exitStatusHolder, notifier)); addRequestHandler(new ExitSignalChannelRequestHandler(exitSignalHolder, notifier)); http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java index 459a659..b3f8c67 100644 --- a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java +++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java @@ -20,7 +20,6 @@ package org.apache.sshd.client.channel; import java.io.IOException; import java.io.InputStream; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import org.apache.sshd.common.SshConstants; @@ -35,6 +34,7 @@ import org.apache.sshd.common.future.CloseFuture; import org.apache.sshd.common.session.Session; import org.apache.sshd.common.util.ValidateUtils; import org.apache.sshd.common.util.buffer.Buffer; +import org.apache.sshd.common.util.threads.ExecutorService; import org.apache.sshd.common.util.threads.ThreadUtils; /** @@ -46,7 +46,6 @@ public class ChannelSession extends AbstractClientChannel { private ExecutorService pumperService; private Future<?> pumper; - private boolean shutdownPumper; public ChannelSession() { super("session"); @@ -93,12 +92,9 @@ public class ChannelSession extends AbstractClientChannel { if (service == null) { pumperService = ThreadUtils.newSingleThreadExecutor("ClientInputStreamPump[" + this.toString() + "]"); } else { - pumperService = service; + pumperService = ThreadUtils.noClose(service); } - // shutdown the temporary executor service if had to create it - shutdownPumper = (pumperService != service) || isShutdownOnExit(); - // Interrupt does not really work and the thread will only exit when // the call to read() will return. So ensure this thread is a daemon // to avoid blocking the whole app @@ -130,7 +126,7 @@ public class ChannelSession extends AbstractClientChannel { @Override protected void doCloseImmediately() { - if ((pumper != null) && (pumperService != null) && shutdownPumper && (!pumperService.isShutdown())) { + if ((pumper != null) && (pumperService != null) && (!pumperService.isShutdown())) { try { if (!pumper.isDone()) { pumper.cancel(true); http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/client/session/AbstractClientSession.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/client/session/AbstractClientSession.java b/sshd-core/src/main/java/org/apache/sshd/client/session/AbstractClientSession.java index 0aa5039..5ec3a69 100644 --- a/sshd-core/src/main/java/org/apache/sshd/client/session/AbstractClientSession.java +++ b/sshd-core/src/main/java/org/apache/sshd/client/session/AbstractClientSession.java @@ -443,8 +443,8 @@ public abstract class AbstractClientSession extends AbstractSession implements C @Override public KeyExchangeFuture switchToNoneCipher() throws IOException { - if (!(currentService instanceof AbstractConnectionService<?>) - || !GenericUtils.isEmpty(((AbstractConnectionService<?>) currentService).getChannels())) { + if (!(currentService instanceof AbstractConnectionService) + || !GenericUtils.isEmpty(((AbstractConnectionService) currentService).getChannels())) { throw new IllegalStateException("The switch to the none cipher must be done immediately after authentication"); } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionService.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionService.java b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionService.java index 1743a8e..497e61e 100644 --- a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionService.java +++ b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionService.java @@ -39,7 +39,9 @@ import org.apache.sshd.server.x11.X11ForwardSupport; * * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> */ -public class ClientConnectionService extends AbstractConnectionService<AbstractClientSession> implements ClientSessionHolder { +public class ClientConnectionService + extends AbstractConnectionService + implements ClientSessionHolder { private ScheduledFuture<?> heartBeat; @@ -53,6 +55,11 @@ public class ClientConnectionService extends AbstractConnectionService<AbstractC } @Override + public AbstractClientSession getSession() { + return (AbstractClientSession) super.getSession(); + } + + @Override public void start() { ClientSession session = getClientSession(); if (!session.isAuthenticated()) { http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java index 94e2d20..6d3c78e 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java @@ -29,7 +29,6 @@ import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -59,7 +58,8 @@ import org.apache.sshd.common.util.buffer.BufferUtils; import org.apache.sshd.common.util.closeable.AbstractInnerCloseable; import org.apache.sshd.common.util.closeable.IoBaseCloseable; import org.apache.sshd.common.util.io.IoUtils; -import org.apache.sshd.common.util.threads.ExecutorServiceConfigurer; +import org.apache.sshd.common.util.threads.ExecutorService; +import org.apache.sshd.common.util.threads.ExecutorServiceCarrier; /** * Provides common client/server channel functionality @@ -68,7 +68,7 @@ import org.apache.sshd.common.util.threads.ExecutorServiceConfigurer; */ public abstract class AbstractChannel extends AbstractInnerCloseable - implements Channel, ExecutorServiceConfigurer { + implements Channel, ExecutorServiceCarrier { /** * Default growth factor function used to resize response buffers @@ -95,7 +95,6 @@ public abstract class AbstractChannel private int recipient = -1; private Session sessionInstance; private ExecutorService executor; - private boolean shutdownExecutor; private final List<RequestHandler<Channel>> requestHandlers = new CopyOnWriteArrayList<>(); private final Window localWindow; @@ -115,19 +114,20 @@ public abstract class AbstractChannel } protected AbstractChannel(boolean client, Collection<? extends RequestHandler<Channel>> handlers) { - this("", client, handlers); + this("", client, handlers, null); } protected AbstractChannel(String discriminator, boolean client) { - this(discriminator, client, Collections.emptyList()); + this(discriminator, client, Collections.emptyList(), null); } - protected AbstractChannel(String discriminator, boolean client, Collection<? extends RequestHandler<Channel>> handlers) { + protected AbstractChannel(String discriminator, boolean client, Collection<? extends RequestHandler<Channel>> handlers, ExecutorService executorService) { super(discriminator); gracefulFuture = new DefaultCloseFuture(discriminator, lock); localWindow = new Window(this, null, client, true); remoteWindow = new Window(this, null, client, false); channelListenerProxy = EventListenerUtils.proxyWrapper(ChannelListener.class, getClass().getClassLoader(), channelListeners); + executor = executorService; addRequestHandlers(handlers); } @@ -189,21 +189,6 @@ public abstract class AbstractChannel } @Override - public void setExecutorService(ExecutorService service) { - executor = service; - } - - @Override - public boolean isShutdownOnExit() { - return shutdownExecutor; - } - - @Override - public void setShutdownOnExit(boolean shutdown) { - shutdownExecutor = shutdown; - } - - @Override public ChannelStreamPacketWriterResolver getChannelStreamPacketWriterResolver() { return channelStreamPacketWriterResolver; } @@ -655,7 +640,7 @@ public abstract class AbstractChannel } ExecutorService service = getExecutorService(); - if ((service != null) && isShutdownOnExit() && (!service.isShutdown())) { + if ((service != null) && (!service.isShutdown())) { Collection<?> running = service.shutdownNow(); if (debugEnabled) { log.debug("close({})[immediately={}] shutdown executor service on close - running count={}", http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/common/io/AbstractIoServiceFactory.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/AbstractIoServiceFactory.java b/sshd-core/src/main/java/org/apache/sshd/common/io/AbstractIoServiceFactory.java index 5167365..243416c 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/io/AbstractIoServiceFactory.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/io/AbstractIoServiceFactory.java @@ -19,12 +19,13 @@ package org.apache.sshd.common.io; -import java.util.concurrent.ExecutorService; +import java.util.Objects; import java.util.concurrent.TimeUnit; import org.apache.sshd.common.FactoryManager; import org.apache.sshd.common.FactoryManagerHolder; import org.apache.sshd.common.util.closeable.AbstractCloseable; +import org.apache.sshd.common.util.threads.ExecutorService; import org.apache.sshd.common.util.threads.ExecutorServiceCarrier; /** @@ -36,12 +37,10 @@ public abstract class AbstractIoServiceFactory private final FactoryManager manager; private final ExecutorService executor; - private final boolean shutdownExecutor; - protected AbstractIoServiceFactory(FactoryManager factoryManager, ExecutorService executorService, boolean shutdownOnExit) { - manager = factoryManager; - executor = executorService; - shutdownExecutor = shutdownOnExit; + protected AbstractIoServiceFactory(FactoryManager factoryManager, ExecutorService executorService) { + manager = Objects.requireNonNull(factoryManager); + executor = Objects.requireNonNull(executorService); } @Override @@ -55,15 +54,10 @@ public abstract class AbstractIoServiceFactory } @Override - public final boolean isShutdownOnExit() { - return shutdownExecutor; - } - - @Override protected void doCloseImmediately() { try { ExecutorService service = getExecutorService(); - if ((service != null) && isShutdownOnExit() && (!service.isShutdown())) { + if ((service != null) && (!service.isShutdown())) { log.debug("Shutdown executor"); service.shutdownNow(); if (service.awaitTermination(5, TimeUnit.SECONDS)) { http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/common/io/AbstractIoServiceFactoryFactory.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/AbstractIoServiceFactoryFactory.java b/sshd-core/src/main/java/org/apache/sshd/common/io/AbstractIoServiceFactoryFactory.java index 1ea4e6e..91f3942 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/io/AbstractIoServiceFactoryFactory.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/io/AbstractIoServiceFactoryFactory.java @@ -19,54 +19,37 @@ package org.apache.sshd.common.io; -import java.util.concurrent.ExecutorService; - +import org.apache.sshd.common.Factory; import org.apache.sshd.common.util.logging.AbstractLoggingBean; -import org.apache.sshd.common.util.threads.ExecutorServiceConfigurer; +import org.apache.sshd.common.util.threads.ExecutorService; /** * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> */ public abstract class AbstractIoServiceFactoryFactory extends AbstractLoggingBean - implements IoServiceFactoryFactory, ExecutorServiceConfigurer { + implements IoServiceFactoryFactory { - private ExecutorService executorService; - private boolean shutdownExecutor; + private Factory<ExecutorService> executorServiceFactory; /** - * @param executors The {@link ExecutorService} to use for spawning threads. - * If {@code null} then an internal service is allocated - in which case it - * is automatically shutdown regardless of the value of the <tt>shutdownOnExit</tt> - * parameter value - * @param shutdownOnExit If {@code true} then the {@link ExecutorService#shutdownNow()} - * will be called (unless it is an internally allocated service which is always - * closed) + * @param factory The {@link ExecutorService} factory to use for spawning threads. + * If {@code null} then an internal service will be allocated. */ - protected AbstractIoServiceFactoryFactory(ExecutorService executors, boolean shutdownOnExit) { - executorService = executors; - shutdownExecutor = shutdownOnExit; - } - - @Override - public ExecutorService getExecutorService() { - return executorService; + protected AbstractIoServiceFactoryFactory(Factory<ExecutorService> factory) { + executorServiceFactory = factory; } - @Override - public void setExecutorService(ExecutorService service) { - executorService = service; - + public Factory<ExecutorService> getExecutorServiceFactory() { + return executorServiceFactory; } @Override - public boolean isShutdownOnExit() { - return shutdownExecutor; + public void setExecutorServiceFactory(Factory<ExecutorService> factory) { + executorServiceFactory = factory; } - @Override - public void setShutdownOnExit(boolean shutdown) { - shutdownExecutor = shutdown; + protected ExecutorService newExecutor() { + return executorServiceFactory != null ? executorServiceFactory.create() : null; } - } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/common/io/DefaultIoServiceFactoryFactory.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/DefaultIoServiceFactoryFactory.java b/sshd-core/src/main/java/org/apache/sshd/common/io/DefaultIoServiceFactoryFactory.java index 74775ff..f30331a 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/io/DefaultIoServiceFactoryFactory.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/io/DefaultIoServiceFactoryFactory.java @@ -20,11 +20,11 @@ package org.apache.sshd.common.io; import java.util.Iterator; import java.util.ServiceLoader; -import java.util.concurrent.ExecutorService; +import org.apache.sshd.common.Factory; import org.apache.sshd.common.FactoryManager; import org.apache.sshd.common.util.GenericUtils; -import org.apache.sshd.common.util.threads.ExecutorServiceConfigurer; +import org.apache.sshd.common.util.threads.ExecutorService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,11 +37,11 @@ public class DefaultIoServiceFactoryFactory extends AbstractIoServiceFactoryFact private IoServiceFactoryFactory factory; protected DefaultIoServiceFactoryFactory() { - this(null, true); + this(null); } - protected DefaultIoServiceFactoryFactory(ExecutorService executors, boolean shutdownOnExit) { - super(executors, shutdownOnExit); + protected DefaultIoServiceFactoryFactory(Factory<ExecutorService> factory) { + super(factory); } @Override @@ -57,10 +57,9 @@ public class DefaultIoServiceFactoryFactory extends AbstractIoServiceFactoryFact synchronized (this) { if (factory == null) { factory = newInstance(IoServiceFactoryFactory.class); - if (factory instanceof ExecutorServiceConfigurer) { - ExecutorServiceConfigurer configurer = (ExecutorServiceConfigurer) factory; - configurer.setExecutorService(getExecutorService()); - configurer.setShutdownOnExit(isShutdownOnExit()); + Factory<ExecutorService> executorServiceFactory = getExecutorServiceFactory(); + if (executorServiceFactory != null) { + factory.setExecutorServiceFactory(executorServiceFactory); } } } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/common/io/IoServiceFactoryFactory.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/IoServiceFactoryFactory.java b/sshd-core/src/main/java/org/apache/sshd/common/io/IoServiceFactoryFactory.java index 92f1e73..74b2237 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/io/IoServiceFactoryFactory.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/io/IoServiceFactoryFactory.java @@ -18,13 +18,16 @@ */ package org.apache.sshd.common.io; +import org.apache.sshd.common.Factory; import org.apache.sshd.common.FactoryManager; +import org.apache.sshd.common.util.threads.ExecutorService; /** * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> */ -@FunctionalInterface public interface IoServiceFactoryFactory { IoServiceFactory create(FactoryManager manager); + + void setExecutorServiceFactory(Factory<ExecutorService> factory); } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2ServiceFactory.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2ServiceFactory.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2ServiceFactory.java index 1cdb651..12fd522 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2ServiceFactory.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2ServiceFactory.java @@ -20,7 +20,6 @@ package org.apache.sshd.common.io.nio2; import java.io.IOException; import java.nio.channels.AsynchronousChannelGroup; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import org.apache.sshd.common.FactoryManager; @@ -29,6 +28,7 @@ import org.apache.sshd.common.io.AbstractIoServiceFactory; import org.apache.sshd.common.io.IoAcceptor; import org.apache.sshd.common.io.IoConnector; import org.apache.sshd.common.io.IoHandler; +import org.apache.sshd.common.util.threads.ExecutorService; import org.apache.sshd.common.util.threads.ThreadUtils; /** @@ -38,12 +38,11 @@ public class Nio2ServiceFactory extends AbstractIoServiceFactory { private final AsynchronousChannelGroup group; - public Nio2ServiceFactory(FactoryManager factoryManager, ExecutorService service, boolean shutdownOnExit) { + public Nio2ServiceFactory(FactoryManager factoryManager, ExecutorService service) { super(factoryManager, - service == null ? ThreadUtils.newFixedThreadPool(factoryManager.toString() + "-nio2", getNioWorkers(factoryManager)) : service, - service == null || shutdownOnExit); + ThreadUtils.newFixedThreadPoolIf(service, factoryManager.toString() + "-nio2", getNioWorkers(factoryManager))); try { - group = AsynchronousChannelGroup.withThreadPool(ThreadUtils.protectExecutorServiceShutdown(getExecutorService(), isShutdownOnExit())); + group = AsynchronousChannelGroup.withThreadPool(ThreadUtils.noClose(getExecutorService())); } catch (IOException e) { log.warn("Failed (" + e.getClass().getSimpleName() + " to start async. channel group: " + e.getMessage()); if (log.isDebugEnabled()) { @@ -71,12 +70,10 @@ public class Nio2ServiceFactory extends AbstractIoServiceFactory { group.shutdownNow(); // if we protect the executor then the await will fail since we didn't really shut it down... - if (isShutdownOnExit()) { - if (group.awaitTermination(5, TimeUnit.SECONDS)) { - log.debug("Group successfully shut down"); - } else { - log.debug("Not all group tasks terminated"); - } + if (group.awaitTermination(5, TimeUnit.SECONDS)) { + log.debug("Group successfully shut down"); + } else { + log.debug("Not all group tasks terminated"); } } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2ServiceFactoryFactory.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2ServiceFactoryFactory.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2ServiceFactoryFactory.java index 3f64b81..d99874c 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2ServiceFactoryFactory.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2ServiceFactoryFactory.java @@ -20,18 +20,19 @@ package org.apache.sshd.common.io.nio2; import java.nio.channels.AsynchronousChannel; import java.util.Objects; -import java.util.concurrent.ExecutorService; +import org.apache.sshd.common.Factory; import org.apache.sshd.common.FactoryManager; import org.apache.sshd.common.io.AbstractIoServiceFactoryFactory; import org.apache.sshd.common.io.IoServiceFactory; +import org.apache.sshd.common.util.threads.ExecutorService; /** */ public class Nio2ServiceFactoryFactory extends AbstractIoServiceFactoryFactory { public Nio2ServiceFactoryFactory() { - this(null, true); + this(null); } /** @@ -39,18 +40,15 @@ public class Nio2ServiceFactoryFactory extends AbstractIoServiceFactoryFactory { * If {@code null} then an internal service is allocated - in which case it * is automatically shutdown regardless of the value of the <tt>shutdownOnExit</tt> * parameter value - * @param shutdownOnExit If {@code true} then the {@link ExecutorService#shutdownNow()} - * will be called (unless it is an internally allocated service which is always - * closed) */ - public Nio2ServiceFactoryFactory(ExecutorService executors, boolean shutdownOnExit) { - super(executors, shutdownOnExit); + public Nio2ServiceFactoryFactory(Factory<ExecutorService> executors) { + super(executors); // Make sure NIO2 is available Objects.requireNonNull(AsynchronousChannel.class, "Missing NIO2 class"); } @Override public IoServiceFactory create(FactoryManager manager) { - return new Nio2ServiceFactory(manager, getExecutorService(), isShutdownOnExit()); + return new Nio2ServiceFactory(manager, newExecutor()); } } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java index 78b1257..cd711e0 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java @@ -52,7 +52,6 @@ import org.apache.sshd.common.forward.PortForwardingEventListenerManager; import org.apache.sshd.common.io.AbstractIoWriteFuture; import org.apache.sshd.common.io.IoWriteFuture; import org.apache.sshd.common.session.ConnectionService; -import org.apache.sshd.common.session.Session; import org.apache.sshd.common.session.UnknownChannelReferenceHandler; import org.apache.sshd.common.util.EventListenerUtils; import org.apache.sshd.common.util.GenericUtils; @@ -66,10 +65,9 @@ import org.apache.sshd.server.x11.X11ForwardSupport; /** * Base implementation of ConnectionService. * - * @param <S> Type of {@link AbstractSession} being used * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> */ -public abstract class AbstractConnectionService<S extends AbstractSession> +public abstract class AbstractConnectionService extends AbstractInnerCloseable implements ConnectionService { /** @@ -105,10 +103,10 @@ public abstract class AbstractConnectionService<S extends AbstractSession> private final Collection<PortForwardingEventListener> listeners = new CopyOnWriteArraySet<>(); private final Collection<PortForwardingEventListenerManager> managersHolder = new CopyOnWriteArraySet<>(); private final PortForwardingEventListener listenerProxy; - private final S sessionInstance; + private final AbstractSession sessionInstance; private UnknownChannelReferenceHandler unknownChannelReferenceHandler; - protected AbstractConnectionService(S session) { + protected AbstractConnectionService(AbstractSession session) { sessionInstance = Objects.requireNonNull(session, "No session"); listenerProxy = EventListenerUtils.proxyWrapper(PortForwardingEventListener.class, getClass().getClassLoader(), listeners); } @@ -166,7 +164,7 @@ public abstract class AbstractConnectionService<S extends AbstractSession> } @Override - public S getSession() { + public AbstractSession getSession() { return sessionInstance; } @@ -178,7 +176,7 @@ public abstract class AbstractConnectionService<S extends AbstractSession> @Override public ForwardingFilter getForwardingFilter() { ForwardingFilter forwarder; - S session = getSession(); + AbstractSession session = getSession(); synchronized (forwarderHolder) { forwarder = forwarderHolder.get(); if (forwarder != null) { @@ -202,7 +200,7 @@ public abstract class AbstractConnectionService<S extends AbstractSession> super.preClose(); } - protected ForwardingFilter createForwardingFilter(S session) { + protected ForwardingFilter createForwardingFilter(AbstractSession session) { FactoryManager manager = Objects.requireNonNull(session.getFactoryManager(), "No factory manager"); ForwardingFilterFactory factory = @@ -215,7 +213,7 @@ public abstract class AbstractConnectionService<S extends AbstractSession> @Override public X11ForwardSupport getX11ForwardSupport() { X11ForwardSupport x11Support; - S session = getSession(); + AbstractSession session = getSession(); synchronized (x11ForwardHolder) { x11Support = x11ForwardHolder.get(); if (x11Support != null) { @@ -232,14 +230,14 @@ public abstract class AbstractConnectionService<S extends AbstractSession> return x11Support; } - protected X11ForwardSupport createX11ForwardSupport(S session) { + protected X11ForwardSupport createX11ForwardSupport(AbstractSession session) { return new DefaultX11ForwardSupport(this); } @Override public AgentForwardSupport getAgentForwardSupport() { AgentForwardSupport agentForward; - S session = getSession(); + AbstractSession session = getSession(); synchronized (agentForwardHolder) { agentForward = agentForwardHolder.get(); if (agentForward != null) { @@ -257,7 +255,7 @@ public abstract class AbstractConnectionService<S extends AbstractSession> return agentForward; } - protected AgentForwardSupport createAgentForwardSupport(S session) { + protected AgentForwardSupport createAgentForwardSupport(AbstractSession session) { return new DefaultAgentForwardSupport(this); } @@ -275,7 +273,7 @@ public abstract class AbstractConnectionService<S extends AbstractSession> @Override public int registerChannel(Channel channel) throws IOException { - Session session = getSession(); + AbstractSession session = getSession(); int maxChannels = session.getIntProperty(MAX_CONCURRENT_CHANNELS_PROP, DEFAULT_MAX_CHANNELS); int curSize = channels.size(); if (curSize > maxChannels) { @@ -584,7 +582,7 @@ public abstract class AbstractConnectionService<S extends AbstractSession> return handler; } - Session s = getSession(); + AbstractSession s = getSession(); return (s == null) ? null : s.resolveUnknownChannelReferenceHandler(); } @@ -614,7 +612,7 @@ public abstract class AbstractConnectionService<S extends AbstractSession> return; } - Session session = getSession(); + AbstractSession session = getSession(); FactoryManager manager = Objects.requireNonNull(session.getFactoryManager(), "No factory manager"); Channel channel = NamedFactory.create(manager.getChannelFactories(), type); if (channel == null) { @@ -673,7 +671,7 @@ public abstract class AbstractConnectionService<S extends AbstractSession> this, sender, SshConstants.getOpenErrorCodeName(reasonCode), lang, message); } - Session session = getSession(); + AbstractSession session = getSession(); Buffer buf = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_OPEN_FAILURE, Long.SIZE + GenericUtils.length(message) + GenericUtils.length(lang)); buf.putInt(sender); @@ -701,7 +699,7 @@ public abstract class AbstractConnectionService<S extends AbstractSession> this, req, wantReply); } - Session session = getSession(); + AbstractSession session = getSession(); FactoryManager manager = Objects.requireNonNull(session.getFactoryManager(), "No factory manager"); Collection<RequestHandler<ConnectionService>> handlers = manager.getGlobalRequestHandlers(); if (GenericUtils.size(handlers) > 0) { @@ -755,18 +753,18 @@ public abstract class AbstractConnectionService<S extends AbstractSession> byte cmd = RequestHandler.Result.ReplySuccess.equals(result) ? SshConstants.SSH_MSG_REQUEST_SUCCESS : SshConstants.SSH_MSG_REQUEST_FAILURE; - Session session = getSession(); + AbstractSession session = getSession(); Buffer rsp = session.createBuffer(cmd, 2); return session.writePacket(rsp); } protected void requestSuccess(Buffer buffer) throws Exception { - S s = getSession(); + AbstractSession s = (AbstractSession) getSession(); s.requestSuccess(buffer); } protected void requestFailure(Buffer buffer) throws Exception { - S s = getSession(); + AbstractSession s = (AbstractSession) getSession(); s.requestFailure(buffer); } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ExecutorService.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ExecutorService.java b/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ExecutorService.java new file mode 100644 index 0000000..fe7ccbe --- /dev/null +++ b/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ExecutorService.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sshd.common.util.threads; + +import org.apache.sshd.common.Closeable; + +public interface ExecutorService extends java.util.concurrent.ExecutorService, Closeable { + +} http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ExecutorServiceCarrier.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ExecutorServiceCarrier.java b/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ExecutorServiceCarrier.java index 71c8c36..7e9378b 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ExecutorServiceCarrier.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ExecutorServiceCarrier.java @@ -19,8 +19,6 @@ package org.apache.sshd.common.util.threads; -import java.util.concurrent.ExecutorService; - /** * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> */ @@ -30,10 +28,4 @@ public interface ExecutorServiceCarrier { */ ExecutorService getExecutorService(); - /** - * @return If {@code true} then the {@link ExecutorService#shutdownNow()} - * will be called (unless it is an internally allocated service which is always - * closed) - */ - boolean isShutdownOnExit(); } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ExecutorServiceConfigurer.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ExecutorServiceConfigurer.java b/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ExecutorServiceConfigurer.java deleted file mode 100644 index e8519c8..0000000 --- a/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ExecutorServiceConfigurer.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sshd.common.util.threads; - -import java.util.concurrent.ExecutorService; - -/** - * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> - */ -public interface ExecutorServiceConfigurer extends ExecutorServiceCarrier { - void setExecutorService(ExecutorService service); - - void setShutdownOnExit(boolean shutdown); -} http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ThreadUtils.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ThreadUtils.java b/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ThreadUtils.java index 803e87e..34a935b 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ThreadUtils.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ThreadUtils.java @@ -18,26 +18,33 @@ */ package org.apache.sshd.common.util.threads; -import java.lang.reflect.InvocationHandler; -import java.lang.reflect.Method; -import java.lang.reflect.Proxy; +import java.io.IOException; import java.security.AccessController; import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; +import java.util.Collection; import java.util.Collections; import java.util.LinkedHashSet; +import java.util.List; import java.util.Set; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.sshd.common.future.CloseFuture; +import org.apache.sshd.common.future.DefaultCloseFuture; +import org.apache.sshd.common.future.SshFutureListener; +import org.apache.sshd.common.util.closeable.AbstractCloseable; import org.apache.sshd.common.util.logging.AbstractLoggingBean; /** @@ -66,34 +73,17 @@ public final class ThreadUtils { * value of the <tt>shutdownOnExit</tt> parameter */ public static ExecutorService protectExecutorServiceShutdown(final ExecutorService executorService, boolean shutdownOnExit) { - if (executorService == null || shutdownOnExit) { + if (executorService == null || shutdownOnExit || executorService instanceof NoCloseExecutor) { return executorService; } else { - return (ExecutorService) Proxy.newProxyInstance( - resolveDefaultClassLoader(executorService), - new Class<?>[]{ExecutorService.class}, - new InvocationHandler() { - private final AtomicBoolean stopped = new AtomicBoolean(false); - - @Override - public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { - String name = method.getName(); - if ("isShutdown".equals(name)) { - return stopped.get(); - } else if ("shutdown".equals(name)) { - stopped.set(true); - return null; // void... - } else if ("shutdownNow".equals(name)) { - stopped.set(true); - return Collections.emptyList(); - } else { - return method.invoke(executorService, args); - } - } - }); + return new NoCloseExecutor(executorService); } } + public static ExecutorService noClose(ExecutorService executorService) { + return protectExecutorServiceShutdown(executorService, false); + } + public static ClassLoader resolveDefaultClassLoader(Object anchor) { return resolveDefaultClassLoader(anchor == null ? null : anchor.getClass()); } @@ -180,16 +170,26 @@ public final class ThreadUtils { return cls; } + public static ExecutorService newFixedThreadPoolIf(ExecutorService executorService, String poolName, int nThreads) { + return executorService == null ? newFixedThreadPool(poolName, nThreads) : executorService; + } + public static ExecutorService newFixedThreadPool(String poolName, int nThreads) { - return new ThreadPoolExecutor(nThreads, nThreads, + return new ThreadPoolExecutor( + nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, // TODO make this configurable new LinkedBlockingQueue<>(), new SshdThreadFactory(poolName), new ThreadPoolExecutor.CallerRunsPolicy()); } + public static ExecutorService newCachedThreadPoolIf(ExecutorService executorService, String poolName) { + return executorService == null ? newCachedThreadPool(poolName) : executorService; + } + public static ExecutorService newCachedThreadPool(String poolName) { - return new ThreadPoolExecutor(0, Integer.MAX_VALUE, // TODO make this configurable + return new ThreadPoolExecutor( + 0, Integer.MAX_VALUE, // TODO make this configurable 60L, TimeUnit.SECONDS, // TODO make this configurable new SynchronousQueue<>(), new SshdThreadFactory(poolName), @@ -248,4 +248,218 @@ public final class ThreadUtils { return t; } } + + public static class NoCloseExecutor implements ExecutorService { + + protected final ExecutorService executor; + protected final CloseFuture closeFuture; + + public NoCloseExecutor(ExecutorService executor) { + this.executor = executor; + closeFuture = new DefaultCloseFuture(null, null); + } + + @Override + public <T> Future<T> submit(Callable<T> task) { + return executor.submit(task); + } + + @Override + public <T> Future<T> submit(Runnable task, T result) { + return executor.submit(task, result); + } + + @Override + public Future<?> submit(Runnable task) { + return executor.submit(task); + } + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { + return executor.invokeAll(tasks); + } + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { + return executor.invokeAll(tasks, timeout, unit); + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { + return executor.invokeAny(tasks); + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return executor.invokeAny(tasks, timeout, unit); + } + + @Override + public void execute(Runnable command) { + executor.execute(command); + } + + @Override + public void shutdown() { + close(true); + } + + @Override + public List<Runnable> shutdownNow() { + close(true); + return Collections.emptyList(); + } + + @Override + public boolean isShutdown() { + return isClosed(); + } + + @Override + public boolean isTerminated() { + return isClosed(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + try { + return closeFuture.await(timeout, unit); + } catch (IOException e) { + throw (InterruptedException) new InterruptedException().initCause(e); + } + } + + @Override + public CloseFuture close(boolean immediately) { + closeFuture.setClosed(); + return closeFuture; + } + + @Override + public void addCloseFutureListener(SshFutureListener<CloseFuture> listener) { + closeFuture.addListener(listener); + } + + @Override + public void removeCloseFutureListener(SshFutureListener<CloseFuture> listener) { + closeFuture.removeListener(listener); + } + + @Override + public boolean isClosed() { + return closeFuture.isClosed(); + } + + @Override + public boolean isClosing() { + return isClosed(); + } + + } + + public static class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor implements ExecutorService { + + final DelegateCloseable closeable = new DelegateCloseable(); + + class DelegateCloseable extends AbstractCloseable { + DelegateCloseable() { + } + + @Override + protected CloseFuture doCloseGracefully() { + shutdown(); + return closeFuture; + } + + @Override + protected void doCloseImmediately() { + shutdownNow(); + super.doCloseImmediately(); + } + + void setClosed() { + closeFuture.setClosed(); + } + } + + public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); + } + + public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); + } + + public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); + } + + public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, + long keepAliveTime, TimeUnit unit, + BlockingQueue<Runnable> workQueue, + ThreadFactory threadFactory, + RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); + } + + @Override + protected void terminated() { + closeable.doCloseImmediately(); + } + + @Override + public void shutdown() { + super.shutdown(); + } + + @Override + public List<Runnable> shutdownNow() { + return super.shutdownNow(); + } + + @Override + public boolean isShutdown() { + return super.isShutdown(); + } + + @Override + public boolean isTerminating() { + return super.isTerminating(); + } + + @Override + public boolean isTerminated() { + return super.isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return super.awaitTermination(timeout, unit); + } + + @Override + public CloseFuture close(boolean immediately) { + return closeable.close(immediately); + } + + @Override + public void addCloseFutureListener(SshFutureListener<CloseFuture> listener) { + closeable.addCloseFutureListener(listener); + } + + @Override + public void removeCloseFutureListener(SshFutureListener<CloseFuture> listener) { + closeable.removeCloseFutureListener(listener); + } + + @Override + public boolean isClosed() { + return closeable.isClosed(); + } + + @Override + public boolean isClosing() { + return closeable.isClosing(); + } + } } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java index 2f3a766..b9da5a7 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java @@ -35,6 +35,7 @@ import org.apache.sshd.common.channel.Window; import org.apache.sshd.common.session.Session; import org.apache.sshd.common.util.GenericUtils; import org.apache.sshd.common.util.buffer.Buffer; +import org.apache.sshd.common.util.threads.ExecutorService; import org.apache.sshd.server.session.ServerSession; /** @@ -46,17 +47,19 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S protected final AtomicBoolean exitStatusSent = new AtomicBoolean(false); - protected AbstractServerChannel() { - this(Collections.emptyList()); + protected AbstractServerChannel(ExecutorService executor) { + super("", false, Collections.emptyList(), executor); } - protected AbstractServerChannel(Collection<? extends RequestHandler<Channel>> handlers) { - this("", handlers); + protected AbstractServerChannel(String discriminator, Collection<? extends RequestHandler<Channel>> handlers, ExecutorService executor) { + super(discriminator, false, handlers, executor); } - protected AbstractServerChannel(String discriminator, Collection<? extends RequestHandler<Channel>> handlers) { - super(discriminator, false, handlers); - } +// TODO: investigate how to fix the forwarding channel failures when enabled +// @Override +// public ServerSession getSession() { +// return (ServerSession) super.getSession(); +// } @Override public ServerSession getServerSession() { http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java index cfaa499..78f5861 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java @@ -77,7 +77,7 @@ import org.apache.sshd.server.session.ServerSession; import org.apache.sshd.server.x11.X11ForwardSupport; /** - * TODO Add javadoc + * TODO Add javadocWindowInitTest * * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> */ @@ -102,12 +102,17 @@ public class ChannelSession extends AbstractServerChannel { } public ChannelSession(Collection<? extends RequestHandler<Channel>> handlers) { - super(handlers); + super("", handlers, null); commandExitFuture = new DefaultCloseFuture(getClass().getSimpleName(), lock); } @Override + public ServerSession getSession() { + return (ServerSession) super.getSession(); + } + + @Override public void handleWindowAdjust(Buffer buffer) throws IOException { super.handleWindowAdjust(buffer); if (asyncOut != null) {