[SSHD-706] Re-factored the code that invokes the various listeners to avoid using proxies
Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/38c84a83 Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/38c84a83 Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/38c84a83 Branch: refs/heads/master Commit: 38c84a830e5078b62912cb8131f83e8ac82b2742 Parents: 6c92dcc Author: Lyor Goldstein <lyor.goldst...@gmail.com> Authored: Sun Nov 6 18:34:00 2016 +0200 Committer: Lyor Goldstein <lyor.goldst...@gmail.com> Committed: Sun Nov 6 18:34:00 2016 +0200 ---------------------------------------------------------------------- .../agent/local/ChannelAgentForwarding.java | 30 +- .../sshd/agent/unix/ChannelAgentForwarding.java | 25 +- .../client/channel/AbstractClientChannel.java | 24 +- .../session/ClientConnectionServiceFactory.java | 7 +- .../sshd/client/session/ClientSessionImpl.java | 26 +- .../sshd/common/channel/AbstractChannel.java | 193 ++++++-- .../sshd/common/channel/ChannelListener.java | 5 + .../common/forward/DefaultTcpipForwarder.java | 440 +++++++++++++++++-- .../forward/DefaultTcpipForwarderFactory.java | 16 +- .../forward/PortForwardingEventListener.java | 4 + ...ortForwardingEventListenerManagerHolder.java | 38 ++ .../sshd/common/forward/TcpipForwarder.java | 6 +- .../common/helpers/AbstractFactoryManager.java | 31 +- .../common/scp/ScpTransferEventListener.java | 4 + .../AbstractConnectionServiceFactory.java | 4 +- .../sshd/common/session/ConnectionService.java | 3 +- .../sshd/common/session/SessionListener.java | 4 + .../helpers/AbstractConnectionService.java | 61 +-- .../common/session/helpers/AbstractSession.java | 270 +++++++++--- .../sshd/common/util/EventListenerUtils.java | 21 +- .../org/apache/sshd/common/util/Invoker.java | 113 +++++ .../sshd/common/util/SshdEventListener.java | 17 +- .../org/apache/sshd/server/SignalListener.java | 4 + .../apache/sshd/server/StandardEnvironment.java | 13 +- .../server/channel/AbstractServerChannel.java | 27 +- .../sshd/server/forward/TcpipServerChannel.java | 71 +-- .../sshd/server/scp/ScpCommandFactory.java | 6 +- .../session/ServerConnectionServiceFactory.java | 7 +- .../sshd/server/session/ServerSessionImpl.java | 27 +- .../sftp/AbstractSftpEventListenerManager.java | 13 +- .../subsystem/sftp/SftpEventListener.java | 4 + .../server/subsystem/sftp/SftpSubsystem.java | 12 +- .../test/java/org/apache/sshd/ProxyTest.java | 10 + 33 files changed, 1136 insertions(+), 400 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java b/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java index d053a31..7aeb35b 100644 --- a/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java +++ b/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java @@ -29,7 +29,6 @@ import org.apache.sshd.client.future.DefaultOpenFuture; import org.apache.sshd.client.future.OpenFuture; import org.apache.sshd.common.FactoryManager; import org.apache.sshd.common.SshConstants; -import org.apache.sshd.common.channel.ChannelListener; import org.apache.sshd.common.channel.ChannelOutputStream; import org.apache.sshd.common.future.CloseFuture; import org.apache.sshd.common.session.Session; @@ -53,8 +52,8 @@ public class ChannelAgentForwarding extends AbstractServerChannel { @Override protected OpenFuture doInit(Buffer buffer) { - final OpenFuture f = new DefaultOpenFuture(this); - ChannelListener listener = getChannelListenerProxy(); + OpenFuture f = new DefaultOpenFuture(this); + String changeEvent = "auth-agent"; try { out = new ChannelOutputStream(this, getRemoteWindow(), log, SshConstants.SSH_MSG_CHANNEL_DATA, true); @@ -64,30 +63,15 @@ public class ChannelAgentForwarding extends AbstractServerChannel { agent = factory.createClient(manager); client = new AgentClient(); - listener.channelOpenSuccess(this); + signalChannelOpenSuccess(); f.setOpened(); } catch (Throwable t) { Throwable e = GenericUtils.peelException(t); - try { - listener.channelOpenFailure(this, e); - } catch (Throwable err) { - Throwable ignored = GenericUtils.peelException(err); - log.warn("doInit({}) failed ({}) to inform listener of open failure={}: {}", - this, ignored.getClass().getSimpleName(), e.getClass().getSimpleName(), ignored.getMessage()); - if (log.isDebugEnabled()) { - log.debug("doInit(" + this + ") inform listener open failure details", ignored); - } - - if (log.isTraceEnabled()) { - Throwable[] suppressed = ignored.getSuppressed(); - if (GenericUtils.length(suppressed) > 0) { - for (Throwable s : suppressed) { - log.trace("doInit(" + this + ") suppressed channel open failure signalling", s); - } - } - } - } + changeEvent = e.getClass().getSimpleName(); + signalChannelOpenFailure(e); f.setException(e); + } finally { + notifyStateChanged(changeEvent); } return f; } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java index ecafc34..42bc3d0 100644 --- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java +++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java @@ -29,7 +29,6 @@ import org.apache.sshd.client.future.DefaultOpenFuture; import org.apache.sshd.client.future.OpenFuture; import org.apache.sshd.common.PropertyResolverUtils; import org.apache.sshd.common.SshConstants; -import org.apache.sshd.common.channel.ChannelListener; import org.apache.sshd.common.channel.ChannelOutputStream; import org.apache.sshd.common.future.CloseFuture; import org.apache.sshd.common.util.GenericUtils; @@ -73,8 +72,7 @@ public class ChannelAgentForwarding extends AbstractServerChannel { @Override protected OpenFuture doInit(Buffer buffer) { - final OpenFuture f = new DefaultOpenFuture(this); - ChannelListener listener = getChannelListenerProxy(); + OpenFuture f = new DefaultOpenFuture(this); try { out = new ChannelOutputStream(this, getRemoteWindow(), log, SshConstants.SSH_MSG_CHANNEL_DATA, true); authSocket = PropertyResolverUtils.getString(this, SshAgent.SSH_AUTHSOCKET_ENV_NAME); @@ -108,28 +106,11 @@ public class ChannelAgentForwarding extends AbstractServerChannel { } }); - listener.channelOpenSuccess(this); + signalChannelOpenSuccess(); f.setOpened(); } catch (Throwable t) { Throwable e = GenericUtils.peelException(t); - try { - listener.channelOpenFailure(this, e); - } catch (Throwable err) { - Throwable ignored = GenericUtils.peelException(err); - log.warn("doInit({}) failed ({}) to inform listener of open failure={}: {}", - this, ignored.getClass().getSimpleName(), e.getClass().getSimpleName(), ignored.getMessage()); - if (log.isDebugEnabled()) { - log.debug("doInit(" + this + ") inform listener open failure details", ignored); - } - if (log.isTraceEnabled()) { - Throwable[] suppressed = ignored.getSuppressed(); - if (GenericUtils.length(suppressed) > 0) { - for (Throwable s : suppressed) { - log.trace("doInit(" + this + ") suppressed channel open failure signalling", s); - } - } - } - } + signalChannelOpenFailure(e); f.setException(e); } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java index 4c3aee3..6adf2f6 100644 --- a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java @@ -41,7 +41,6 @@ import org.apache.sshd.common.channel.AbstractChannel; import org.apache.sshd.common.channel.Channel; import org.apache.sshd.common.channel.ChannelAsyncInputStream; import org.apache.sshd.common.channel.ChannelAsyncOutputStream; -import org.apache.sshd.common.channel.ChannelListener; import org.apache.sshd.common.channel.RequestHandler; import org.apache.sshd.common.channel.Window; import org.apache.sshd.common.io.IoInputStream; @@ -328,36 +327,17 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C Window wRemote = getRemoteWindow(); wRemote.init(rwSize, packetSize, manager.getProperties()); - ChannelListener listener = getChannelListenerProxy(); String changeEvent = "SSH_MSG_CHANNEL_OPEN_CONFIRMATION"; try { doOpen(); - listener.channelOpenSuccess(this); + signalChannelOpenSuccess(); this.opened.set(true); this.openFuture.setOpened(); } catch (Throwable t) { Throwable e = GenericUtils.peelException(t); changeEvent = e.getClass().getName(); - try { - listener.channelOpenFailure(this, e); - } catch (Throwable err) { - Throwable ignored = GenericUtils.peelException(err); - log.warn("handleOpenSuccess({}) failed ({}) to inform listener of open failure={}: {}", - this, ignored.getClass().getSimpleName(), e.getClass().getSimpleName(), ignored.getMessage()); - if (log.isDebugEnabled()) { - log.debug("handleOpenSuccess(" + this + ") inform listener open failure details", ignored); - } - if (log.isTraceEnabled()) { - Throwable[] suppressed = ignored.getSuppressed(); - if (GenericUtils.length(suppressed) > 0) { - for (Throwable s : suppressed) { - log.trace("handleOpenSuccess(" + this + ") suppressed channel open failure signalling", s); - } - } - } - } - + signalChannelOpenFailure(e); this.openFuture.setException(e); this.closeFuture.setClosed(); this.doCloseImmediately(); http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionServiceFactory.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionServiceFactory.java b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionServiceFactory.java index 9a7505a..d8f90d0 100644 --- a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionServiceFactory.java +++ b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionServiceFactory.java @@ -41,6 +41,11 @@ public class ClientConnectionServiceFactory extends AbstractConnectionServiceFac public void removePortForwardingEventListener(PortForwardingEventListener listener) { throw new UnsupportedOperationException("removePortForwardingEventListener(" + listener + ") N/A on default instance"); } + + @Override + public PortForwardingEventListener getPortForwardingEventListenerProxy() { + return PortForwardingEventListener.EMPTY; + } }; public ClientConnectionServiceFactory() { @@ -57,7 +62,7 @@ public class ClientConnectionServiceFactory extends AbstractConnectionServiceFac AbstractClientSession abstractSession = ValidateUtils.checkInstanceOf(session, AbstractClientSession.class, "Not a client session: %s", session); ClientConnectionService service = new ClientConnectionService(abstractSession); - service.addPortForwardingEventListener(getPortForwardingEventListenerProxy()); + service.addPortForwardingEventListenerManager(this); return service; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java index 28a72ad..4ae713f 100644 --- a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java +++ b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java @@ -32,7 +32,6 @@ import java.util.Set; import org.apache.sshd.client.ClientFactoryManager; import org.apache.sshd.client.future.AuthFuture; import org.apache.sshd.client.future.DefaultAuthFuture; -import org.apache.sshd.common.RuntimeSshException; import org.apache.sshd.common.Service; import org.apache.sshd.common.ServiceFactory; import org.apache.sshd.common.SshConstants; @@ -87,26 +86,7 @@ public class ClientSessionImpl extends AbstractClientSession { authFuture = new DefaultAuthFuture(lock); authFuture.setAuthed(false); - // Inform the listener of the newly created session - SessionListener listener = getSessionListenerProxy(); - try { - listener.sessionCreated(this); - } catch (Throwable t) { - Throwable e = GenericUtils.peelException(t); - if (log.isDebugEnabled()) { - log.debug("Failed ({}) to announce session={} created: {}", - e.getClass().getSimpleName(), ioSession, e.getMessage()); - } - if (log.isTraceEnabled()) { - log.trace("Session=" + ioSession + " creation failure details", e); - } - if (e instanceof Exception) { - throw (Exception) e; - } else { - throw new RuntimeSshException(e); - } - } - + signalSessionCreated(ioSession); sendClientIdentification(); kexState.set(KexState.INIT); sendKexInit(); @@ -188,14 +168,14 @@ public class ClientSessionImpl extends AbstractClientSession { } @Override - protected void sendSessionEvent(SessionListener.Event event) throws IOException { + protected void signalSessionEvent(SessionListener.Event event) throws IOException { if (SessionListener.Event.KeyEstablished.equals(event)) { sendInitialServiceRequest(); } synchronized (lock) { lock.notifyAll(); } - super.sendSessionEvent(event); + super.signalSessionEvent(event); } protected void sendInitialServiceRequest() throws IOException { http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java index b4810b2..71cb861 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -50,6 +51,7 @@ import org.apache.sshd.common.session.Session; import org.apache.sshd.common.util.EventListenerUtils; import org.apache.sshd.common.util.GenericUtils; import org.apache.sshd.common.util.Int2IntFunction; +import org.apache.sshd.common.util.Invoker; import org.apache.sshd.common.util.ValidateUtils; import org.apache.sshd.common.util.buffer.Buffer; import org.apache.sshd.common.util.buffer.BufferUtils; @@ -85,8 +87,7 @@ public abstract class AbstractChannel /** * Channel events listener */ - protected final Collection<ChannelListener> channelListeners = - EventListenerUtils.synchronizedListenersSet(); + protected final Collection<ChannelListener> channelListeners = new CopyOnWriteArraySet<>(); protected final ChannelListener channelListenerProxy; private int id = -1; @@ -352,11 +353,19 @@ public abstract class AbstractChannel this.sessionInstance = session; this.id = id; - ChannelListener listener = session.getChannelListenerProxy(); + signalChannelInitialized(); + configureWindow(); + initialized.set(true); + } + + protected void signalChannelInitialized() throws IOException { try { - listener.channelInitialized(this); - } catch (Throwable t) { - Throwable e = GenericUtils.peelException(t); + invokeChannelSignaller(l -> { + signalChannelInitialized(l); + return null; + }); + } catch (Throwable err) { + Throwable e = GenericUtils.peelException(err); if (e instanceof IOException) { throw (IOException) e; } else if (e instanceof RuntimeException) { @@ -365,10 +374,39 @@ public abstract class AbstractChannel throw new IOException("Failed (" + e.getClass().getSimpleName() + ") to notify channel " + this + " initialization: " + e.getMessage(), e); } } - // delegate the rest of the notifications to the channel - addChannelListener(listener); - configureWindow(); - initialized.set(true); + } + + protected void signalChannelInitialized(ChannelListener listener) { + if (listener == null) { + return; + } + + listener.channelInitialized(this); + } + + protected void signalChannelOpenSuccess() { + try { + invokeChannelSignaller(l -> { + signalChannelOpenSuccess(l); + return null; + }); + } catch (Throwable err) { + if (err instanceof RuntimeException) { + throw (RuntimeException) err; + } else if (err instanceof Error) { + throw (Error) err; + } else { + throw new RuntimeException(err); + } + } + } + + protected void signalChannelOpenSuccess(ChannelListener listener) { + if (listener == null) { + return; + } + + listener.channelOpenSuccess(this); } @Override @@ -376,27 +414,70 @@ public abstract class AbstractChannel return initialized.get(); } + protected void signalChannelOpenFailure(Throwable reason) { + try { + invokeChannelSignaller(l -> { + signalChannelOpenFailure(l, reason); + return null; + }); + } catch (Throwable err) { + Throwable ignored = GenericUtils.peelException(err); + log.warn("signalChannelOpenFailure({}) failed ({}) to inform listener of open failure={}: {}", + this, ignored.getClass().getSimpleName(), reason.getClass().getSimpleName(), ignored.getMessage()); + if (log.isDebugEnabled()) { + log.debug("doInit(" + this + ") inform listener open failure details", ignored); + } + + if (log.isTraceEnabled()) { + Throwable[] suppressed = ignored.getSuppressed(); + if (GenericUtils.length(suppressed) > 0) { + for (Throwable s : suppressed) { + log.trace("signalChannelOpenFailure(" + this + ") suppressed channel open failure signalling", s); + } + } + } + } + } + + protected void signalChannelOpenFailure(ChannelListener listener, Throwable reason) { + if (listener == null) { + return; + } + + listener.channelOpenFailure(this, reason); + } + protected void notifyStateChanged(String hint) { - ChannelListener listener = getChannelListenerProxy(); try { - listener.channelStateChanged(this, hint); - } catch (Throwable t) { - Throwable e = GenericUtils.peelException(t); + invokeChannelSignaller(l -> { + notifyStateChanged(l, hint); + return null; + }); + } catch (Throwable err) { + Throwable e = GenericUtils.peelException(err); log.warn("notifyStateChanged({})[{}] {} while signal channel state change: {}", this, hint, e.getClass().getSimpleName(), e.getMessage()); if (log.isDebugEnabled()) { log.debug("notifyStateChanged(" + this + ")[" + hint + "] channel state signalling failure details", e); } + } finally { + synchronized (lock) { + lock.notifyAll(); + } } + } - synchronized (lock) { - lock.notifyAll(); + protected void notifyStateChanged(ChannelListener listener, String hint) { + if (listener == null) { + return; } + + listener.channelStateChanged(this, hint); } @Override public void addChannelListener(ChannelListener listener) { - ValidateUtils.checkNotNull(listener, "addChannelListener(%s) null instance", this); + ChannelListener.validateListener(listener); // avoid race conditions on notifications while channel is being closed if (!isOpen()) { log.warn("addChannelListener({})[{}] ignore registration while channel is closing", this, listener); @@ -416,6 +497,11 @@ public abstract class AbstractChannel @Override public void removeChannelListener(ChannelListener listener) { + if (listener == null) { + return; + } + + ChannelListener.validateListener(listener); if (this.channelListeners.remove(listener)) { if (log.isTraceEnabled()) { log.trace("removeChannelListener({})[{}] removed", this, listener); @@ -583,23 +669,8 @@ public abstract class AbstractChannel @Override protected void preClose() { - ChannelListener listener = getChannelListenerProxy(); try { - listener.channelClosed(this, null); - } catch (Throwable t) { - Throwable e = GenericUtils.peelException(t); - log.warn("preClose({}) {} while signal channel closed: {}", this, e.getClass().getSimpleName(), e.getMessage()); - if (log.isDebugEnabled()) { - log.debug("preClose(" + this + ") channel closed signalling failure details", e); - } - if (log.isTraceEnabled()) { - Throwable[] suppressed = e.getSuppressed(); - if (GenericUtils.length(suppressed) > 0) { - for (Throwable s : suppressed) { - log.trace("preClose(" + this + ") suppressed closed channel signalling failure", s); - } - } - } + signalChannelClosed(null); } finally { // clear the listeners since we are closing the channel (quicker GC) this.channelListeners.clear(); @@ -624,6 +695,62 @@ public abstract class AbstractChannel super.preClose(); } + public void signalChannelClosed(Throwable reason) { + try { + invokeChannelSignaller(l -> { + signalChannelClosed(l, reason); + return null; + }); + } catch (Throwable err) { + Throwable e = GenericUtils.peelException(err); + log.warn("signalChannelClosed({}) {} while signal channel closed: {}", this, e.getClass().getSimpleName(), e.getMessage()); + if (log.isDebugEnabled()) { + log.debug("signalChannelClosed(" + this + ") channel closed signalling failure details", e); + } + if (log.isTraceEnabled()) { + Throwable[] suppressed = e.getSuppressed(); + if (GenericUtils.length(suppressed) > 0) { + for (Throwable s : suppressed) { + log.trace("signalChannelClosed(" + this + ") suppressed closed channel signalling failure", s); + } + } + } + } + } + + protected void signalChannelClosed(ChannelListener listener, Throwable reason) { + if (listener == null) { + return; + } + + listener.channelClosed(this, reason); + } + + protected void invokeChannelSignaller(Invoker<ChannelListener, Void> invoker) throws Throwable { + Session session = getSession(); + FactoryManager manager = (session == null) ? null : session.getFactoryManager(); + ChannelListener[] listeners = { + (manager == null) ? null : manager.getChannelListenerProxy(), + (session == null) ? null : session.getChannelListenerProxy(), + getChannelListenerProxy() + }; + + Throwable err = null; + for (ChannelListener l : listeners) { + if (l == null) { + continue; + } + try { + invoker.invoke(l); + } catch (Throwable t) { + err = GenericUtils.accumulateException(err, t); + } + } + + if (err != null) { + throw err; + } + } @Override protected void doCloseImmediately() { if (service != null) { http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelListener.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelListener.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelListener.java index 0124fca..ba0d72f 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelListener.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelListener.java @@ -80,6 +80,7 @@ public interface ChannelListener extends SshdEventListener { * @param channel The {@link Channel} whose state has changed * @param hint A "hint" as to the nature of the state change. * it can be a request name or a {@code SSH_MSG_CHANNEL_XXX} command + * or the name of an exception class */ default void channelStateChanged(Channel channel, String hint) { // ignored @@ -100,4 +101,8 @@ public interface ChannelListener extends SshdEventListener { default void channelClosed(Channel channel, Throwable reason) { // ignored } + + static <L extends ChannelListener> L validateListener(L listener) { + return SshdEventListener.validateListener(listener, ChannelListener.class.getSimpleName()); + } } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java index 5462753..6288d41 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; @@ -30,6 +31,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.TimeUnit; import org.apache.sshd.client.channel.ClientChannelEvent; @@ -50,6 +52,7 @@ import org.apache.sshd.common.session.Session; import org.apache.sshd.common.session.SessionHolder; import org.apache.sshd.common.util.EventListenerUtils; import org.apache.sshd.common.util.GenericUtils; +import org.apache.sshd.common.util.Invoker; import org.apache.sshd.common.util.Readable; import org.apache.sshd.common.util.ValidateUtils; import org.apache.sshd.common.util.buffer.Buffer; @@ -91,8 +94,8 @@ public class DefaultTcpipForwarder private final Map<Integer, SocksProxy> dynamicLocal = new HashMap<>(); private final Set<LocalForwardingEntry> localForwards = new HashSet<>(); private final IoHandlerFactory staticIoHandlerFactory = StaticIoHandler::new; - private final Collection<PortForwardingEventListener> listeners = - EventListenerUtils.synchronizedListenersSet(); + private final Collection<PortForwardingEventListener> listeners = new CopyOnWriteArraySet<>(); + private final Collection<PortForwardingEventListenerManager> managersHolder = new CopyOnWriteArraySet<>(); private final PortForwardingEventListener listenerProxy; private IoAcceptor acceptor; @@ -110,7 +113,7 @@ public class DefaultTcpipForwarder @Override public void addPortForwardingEventListener(PortForwardingEventListener listener) { - listeners.add(Objects.requireNonNull(listener, "No listener to add")); + listeners.add(PortForwardingEventListener.validateListener(listener)); } @Override @@ -119,7 +122,26 @@ public class DefaultTcpipForwarder return; } - listeners.remove(listener); + listeners.remove(PortForwardingEventListener.validateListener(listener)); + } + + @Override + public Collection<PortForwardingEventListenerManager> getRegisteredManagers() { + return managersHolder.isEmpty() ? Collections.emptyList() : new ArrayList<>(managersHolder); + } + + @Override + public boolean addPortForwardingEventListenerManager(PortForwardingEventListenerManager manager) { + return managersHolder.add(Objects.requireNonNull(manager, "No manager")); + } + + @Override + public boolean removePortForwardingEventListenerManager(PortForwardingEventListenerManager manager) { + if (manager == null) { + return false; + } + + return managersHolder.remove(manager); } @Override @@ -131,6 +153,25 @@ public class DefaultTcpipForwarder return service; } + protected Collection<PortForwardingEventListener> getDefaultListeners() { + Collection<PortForwardingEventListener> defaultListeners = new ArrayList<>(); + defaultListeners.add(getPortForwardingEventListenerProxy()); + + Session session = getSession(); + PortForwardingEventListener l = session.getPortForwardingEventListenerProxy(); + if (l != null) { + defaultListeners.add(l); + } + + FactoryManager manager = (session == null) ? null : session.getFactoryManager(); + l = (manager == null) ? null : manager.getPortForwardingEventListenerProxy(); + if (l != null) { + defaultListeners.add(l); + } + + return defaultListeners; + } + // // TcpIpForwarder implementation // @@ -150,8 +191,7 @@ public class DefaultTcpipForwarder InetSocketAddress bound; int port; - PortForwardingEventListener listener = getPortForwardingEventListenerProxy(); - listener.establishingExplicitTunnel(getSession(), local, remote, true); + signalEstablishingExplicitTunnel(local, remote, true); try { bound = doBind(local, staticIoHandlerFactory); port = bound.getPort(); @@ -169,7 +209,7 @@ public class DefaultTcpipForwarder } catch (IOException | RuntimeException err) { e.addSuppressed(err); } - listener.establishedExplicitTunnel(getSession(), local, remote, true, null, e); + signalEstablishedExplicitTunnel(local, remote, true, null, e); throw e; } @@ -178,7 +218,7 @@ public class DefaultTcpipForwarder if (log.isDebugEnabled()) { log.debug("startLocalPortForwarding(" + local + " -> " + remote + "): " + result); } - listener.establishedExplicitTunnel(getSession(), local, remote, true, result, null); + signalEstablishedExplicitTunnel(local, remote, true, result, null); return result; } catch (IOException | RuntimeException e) { stopLocalPortForwarding(local); @@ -200,16 +240,15 @@ public class DefaultTcpipForwarder log.debug("stopLocalPortForwarding(" + local + ") unbind " + bound); } - PortForwardingEventListener listener = getPortForwardingEventListenerProxy(); - listener.tearingDownExplicitTunnel(getSession(), bound, true); + signalTearingDownExplicitTunnel(bound, true); try { acceptor.unbind(bound.toInetSocketAddress()); } catch (RuntimeException e) { - listener.tornDownExplicitTunnel(getSession(), bound, true, e); + signalTornDownExplicitTunnel(bound, true, e); throw e; } - listener.tornDownExplicitTunnel(getSession(), bound, true, null); + signalTornDownExplicitTunnel(bound, true, null); } else { if (log.isDebugEnabled()) { log.debug("stopLocalPortForwarding(" + local + ") no mapping/acceptor for " + bound); @@ -234,8 +273,7 @@ public class DefaultTcpipForwarder long timeout = PropertyResolverUtils.getLongProperty(session, FORWARD_REQUEST_TIMEOUT, DEFAULT_FORWARD_REQUEST_TIMEOUT); Buffer result; int port; - PortForwardingEventListener listener = getPortForwardingEventListenerProxy(); - listener.establishingExplicitTunnel(getSession(), local, remote, false); + signalEstablishingExplicitTunnel(local, remote, false); try { result = session.request("tcpip-forward", buffer, timeout, TimeUnit.MILLISECONDS); if (result == null) { @@ -257,7 +295,7 @@ public class DefaultTcpipForwarder } catch (IOException | RuntimeException err) { e.addSuppressed(err); } - listener.establishedExplicitTunnel(session, local, remote, false, null, e); + signalEstablishedExplicitTunnel(local, remote, false, null, e); throw e; } @@ -267,7 +305,7 @@ public class DefaultTcpipForwarder log.debug("startRemotePortForwarding(" + remote + " -> " + local + "): " + bound); } - listener.establishedExplicitTunnel(getSession(), local, remote, false, bound, null); + signalEstablishedExplicitTunnel(local, remote, false, bound, null); return bound; } catch (IOException | RuntimeException e) { stopRemotePortForwarding(remote); @@ -295,16 +333,15 @@ public class DefaultTcpipForwarder buffer.putString(remoteHost); buffer.putInt(remote.getPort()); - PortForwardingEventListener listener = getPortForwardingEventListenerProxy(); - listener.tearingDownExplicitTunnel(getSession(), bound, false); + signalTearingDownExplicitTunnel(bound, false); try { session.writePacket(buffer); } catch (IOException | RuntimeException e) { - listener.tornDownExplicitTunnel(getSession(), bound, false, e); + signalTornDownExplicitTunnel(bound, false, e); throw e; } - listener.tornDownExplicitTunnel(getSession(), bound, false, null); + signalTornDownExplicitTunnel(bound, false, null); } else { if (log.isDebugEnabled()) { log.debug("stopRemotePortForwarding(" + remote + ") no binding found"); @@ -312,6 +349,68 @@ public class DefaultTcpipForwarder } } + protected void signalTearingDownExplicitTunnel(SshdSocketAddress boundAddress, boolean localForwarding) throws IOException { + try { + invokePortEventListenerSignaller(l -> { + signalTearingDownExplicitTunnel(l, boundAddress, localForwarding); + return null; + }); + } catch (Throwable t) { + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } else if (t instanceof Error) { + throw (Error) t; + } else if (t instanceof IOException) { + throw (IOException) t; + } else { + throw new IOException("Failed (" + t.getClass().getSimpleName() + ")" + + " to signal tearing down explicit tunnel for local=" + localForwarding + + " on bound=" + boundAddress, t); + } + } + } + + protected void signalTearingDownExplicitTunnel( + PortForwardingEventListener listener, SshdSocketAddress boundAddress, boolean localForwarding) + throws IOException { + if (listener == null) { + return; + } + + listener.tearingDownExplicitTunnel(getSession(), boundAddress, localForwarding); + } + + protected void signalTornDownExplicitTunnel(SshdSocketAddress boundAddress, boolean localForwarding, Throwable reason) throws IOException { + try { + invokePortEventListenerSignaller(l -> { + signalTornDownExplicitTunnel(l, boundAddress, localForwarding, reason); + return null; + }); + } catch (Throwable t) { + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } else if (t instanceof Error) { + throw (Error) t; + } else if (t instanceof IOException) { + throw (IOException) t; + } else { + throw new IOException("Failed (" + t.getClass().getSimpleName() + ")" + + " to signal torn down explicit tunnel local=" + localForwarding + + " on bound=" + boundAddress, t); + } + } + } + + protected void signalTornDownExplicitTunnel( + PortForwardingEventListener listener, SshdSocketAddress boundAddress, boolean localForwarding, Throwable reason) + throws IOException { + if (listener == null) { + return; + } + + listener.tornDownExplicitTunnel(getSession(), boundAddress, localForwarding, reason); + } + @Override public synchronized SshdSocketAddress startDynamicPortForwarding(SshdSocketAddress local) throws IOException { Objects.requireNonNull(local, "Local address is null"); @@ -328,8 +427,7 @@ public class DefaultTcpipForwarder SocksProxy prev; InetSocketAddress bound; int port; - PortForwardingEventListener listener = getPortForwardingEventListenerProxy(); - listener.establishingDynamicTunnel(getSession(), local); + signalEstablishingDynamicTunnel(local); try { bound = doBind(local, socksProxyIoHandlerFactory); port = bound.getPort(); @@ -346,7 +444,7 @@ public class DefaultTcpipForwarder } catch (IOException | RuntimeException err) { e.addSuppressed(err); } - listener.establishedDynamicTunnel(getSession(), local, null, e); + signalEstablishedDynamicTunnel(local, null, e); throw e; } @@ -356,7 +454,7 @@ public class DefaultTcpipForwarder log.debug("startDynamicPortForwarding(" + local + "): " + result); } - listener.establishedDynamicTunnel(getSession(), local, result, null); + signalEstablishedDynamicTunnel(local, result, null); return result; } catch (IOException | RuntimeException e) { stopDynamicPortForwarding(local); @@ -364,6 +462,67 @@ public class DefaultTcpipForwarder } } + protected void signalEstablishedDynamicTunnel( + SshdSocketAddress local, SshdSocketAddress boundAddress, Throwable reason) + throws IOException { + try { + invokePortEventListenerSignaller(l -> { + signalEstablishedDynamicTunnel(l, local, boundAddress, reason); + return null; + }); + } catch (Throwable t) { + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } else if (t instanceof Error) { + throw (Error) t; + } else if (t instanceof IOException) { + throw (IOException) t; + } else { + throw new IOException("Failed (" + t.getClass().getSimpleName() + ")" + + " to signal establishing dynamic tunnel for local=" + local + + " on bound=" + boundAddress, t); + } + } + } + + protected void signalEstablishedDynamicTunnel(PortForwardingEventListener listener, + SshdSocketAddress local, SshdSocketAddress boundAddress, Throwable reason) + throws IOException { + if (listener == null) { + return; + } + + listener.establishedDynamicTunnel(getSession(), local, boundAddress, reason); + } + + protected void signalEstablishingDynamicTunnel(SshdSocketAddress local) throws IOException { + try { + invokePortEventListenerSignaller(l -> { + signalEstablishingDynamicTunnel(l, local); + return null; + }); + } catch (Throwable t) { + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } else if (t instanceof Error) { + throw (Error) t; + } else if (t instanceof IOException) { + throw (IOException) t; + } else { + throw new IOException("Failed (" + t.getClass().getSimpleName() + ")" + + " to signal establishing dynamic tunnel for local=" + local, t); + } + } + } + + protected void signalEstablishingDynamicTunnel(PortForwardingEventListener listener, SshdSocketAddress local) throws IOException { + if (listener == null) { + return; + } + + listener.establishingDynamicTunnel(getSession(), local); + } + @Override public synchronized void stopDynamicPortForwarding(SshdSocketAddress local) throws IOException { SocksProxy obj; @@ -376,17 +535,16 @@ public class DefaultTcpipForwarder log.debug("stopDynamicPortForwarding(" + local + ") unbinding"); } - PortForwardingEventListener listener = getPortForwardingEventListenerProxy(); - listener.tearingDownDynamicTunnel(sessionInstance, local); + signalTearingDownDynamicTunnel(local); try { obj.close(true); acceptor.unbind(local.toInetSocketAddress()); } catch (RuntimeException e) { - listener.tornDownDynamicTunnel(getSession(), local, e); + signalTornDownDynamicTunnel(local, e); throw e; } - listener.tornDownDynamicTunnel(getSession(), local, null); + signalTornDownDynamicTunnel(local, null); } else { if (log.isDebugEnabled()) { log.debug("stopDynamicPortForwarding(" + local + ") no binding found"); @@ -394,6 +552,64 @@ public class DefaultTcpipForwarder } } + protected void signalTearingDownDynamicTunnel(SshdSocketAddress address) throws IOException { + try { + invokePortEventListenerSignaller(l -> { + signalTearingDownDynamicTunnel(l, address); + return null; + }); + } catch (Throwable t) { + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } else if (t instanceof Error) { + throw (Error) t; + } else if (t instanceof IOException) { + throw (IOException) t; + } else { + throw new IOException("Failed (" + t.getClass().getSimpleName() + ")" + + " to signal tearing down dynamic tunnel for address=" + address, t); + } + } + } + + protected void signalTearingDownDynamicTunnel(PortForwardingEventListener listener, SshdSocketAddress address) throws IOException { + if (listener == null) { + return; + } + + listener.tearingDownDynamicTunnel(getSession(), address); + } + + protected void signalTornDownDynamicTunnel(SshdSocketAddress address, Throwable reason) throws IOException { + try { + invokePortEventListenerSignaller(l -> { + signalTornDownDynamicTunnel(l, address, reason); + return null; + }); + } catch (Throwable t) { + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } else if (t instanceof Error) { + throw (Error) t; + } else if (t instanceof IOException) { + throw (IOException) t; + } else { + throw new IOException("Failed (" + t.getClass().getSimpleName() + ")" + + " to signal torn down dynamic tunnel for address=" + address, t); + } + } + } + + protected void signalTornDownDynamicTunnel( + PortForwardingEventListener listener, SshdSocketAddress address, Throwable reason) + throws IOException { + if (listener == null) { + return; + } + + listener.tornDownDynamicTunnel(getSession(), address, reason); + } + @Override public synchronized SshdSocketAddress getForwardedPort(int remotePort) { synchronized (remoteToLocal) { @@ -425,8 +641,7 @@ public class DefaultTcpipForwarder throw new RuntimeSshException(e); } - PortForwardingEventListener listener = getPortForwardingEventListenerProxy(); - listener.establishingExplicitTunnel(getSession(), local, null, true); + signalEstablishingExplicitTunnel(local, null, true); SshdSocketAddress result; try { InetSocketAddress bound = doBind(local, staticIoHandlerFactory); @@ -450,12 +665,12 @@ public class DefaultTcpipForwarder } catch (IOException | RuntimeException err) { e.addSuppressed(e); } - listener.establishedExplicitTunnel(getSession(), local, null, true, null, e); + signalEstablishedExplicitTunnel(local, null, true, null, e); throw e; } try { - listener.establishedExplicitTunnel(getSession(), local, null, true, result, null); + signalEstablishedExplicitTunnel(local, null, true, result, null); return result; } catch (IOException | RuntimeException e) { throw e; @@ -477,16 +692,15 @@ public class DefaultTcpipForwarder log.debug("localPortForwardingCancelled(" + local + ") unbind " + entry); } - PortForwardingEventListener listener = getPortForwardingEventListenerProxy(); - listener.tearingDownExplicitTunnel(getSession(), entry, true); + signalTearingDownExplicitTunnel(entry, true); try { acceptor.unbind(entry.toInetSocketAddress()); } catch (RuntimeException e) { - listener.tornDownExplicitTunnel(getSession(), entry, true, e); + signalTornDownExplicitTunnel(entry, true, e); throw e; } - listener.tornDownExplicitTunnel(getSession(), entry, true, null); + signalTornDownExplicitTunnel(entry, true, null); } else { if (log.isDebugEnabled()) { log.debug("localPortForwardingCancelled(" + local + ") no match/acceptor: " + entry); @@ -494,6 +708,159 @@ public class DefaultTcpipForwarder } } + protected void signalEstablishingExplicitTunnel( + SshdSocketAddress local, SshdSocketAddress remote, boolean localForwarding) + throws IOException { + try { + invokePortEventListenerSignaller(l -> { + signalEstablishingExplicitTunnel(l, local, remote, localForwarding); + return null; + }); + } catch (Throwable t) { + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } else if (t instanceof Error) { + throw (Error) t; + } else if (t instanceof IOException) { + throw (IOException) t; + } else { + throw new IOException("Failed (" + t.getClass().getSimpleName() + ")" + + " to signal establishing explicit tunnel for local=" + local + + ", remote=" + remote + ", localForwarding=" + localForwarding, t); + } + } + } + + protected void signalEstablishingExplicitTunnel(PortForwardingEventListener listener, + SshdSocketAddress local, SshdSocketAddress remote, boolean localForwarding) + throws IOException { + if (listener == null) { + return; + } + + listener.establishingExplicitTunnel(getSession(), local, remote, localForwarding); + } + + protected void signalEstablishedExplicitTunnel( + SshdSocketAddress local, SshdSocketAddress remote, boolean localForwarding, + SshdSocketAddress boundAddress, Throwable reason) + throws IOException { + try { + invokePortEventListenerSignaller(l -> { + signalEstablishedExplicitTunnel(l, local, remote, localForwarding, boundAddress, reason); + return null; + }); + } catch (Throwable t) { + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } else if (t instanceof Error) { + throw (Error) t; + } else if (t instanceof IOException) { + throw (IOException) t; + } else { + throw new IOException("Failed (" + t.getClass().getSimpleName() + ")" + + " to signal established explicit tunnel for local=" + local + + ", remote=" + remote + ", localForwarding=" + localForwarding + + ", bound=" + boundAddress, t); + } + } + } + + protected void signalEstablishedExplicitTunnel(PortForwardingEventListener listener, + SshdSocketAddress local, SshdSocketAddress remote, boolean localForwarding, + SshdSocketAddress boundAddress, Throwable reason) + throws IOException { + if (listener == null) { + return; + } + + listener.establishedExplicitTunnel(getSession(), local, remote, localForwarding, boundAddress, reason); + } + + protected void invokePortEventListenerSignaller(Invoker<PortForwardingEventListener, Void> invoker) throws Throwable { + Throwable err = null; + try { + invokePortEventListenerSignallerListeners(getDefaultListeners(), invoker); + } catch (Throwable t) { + Throwable e = GenericUtils.peelException(t); + err = GenericUtils.accumulateException(err, e); + } + + try { + invokePortEventListenerSignallerHolders(managersHolder, invoker); + } catch (Throwable t) { + Throwable e = GenericUtils.peelException(t); + err = GenericUtils.accumulateException(err, e); + } + + + if (err != null) { + throw err; + } + } + + protected void invokePortEventListenerSignallerListeners( + Collection<? extends PortForwardingEventListener> listeners, Invoker<PortForwardingEventListener, Void> invoker) + throws Throwable { + if (GenericUtils.isEmpty(listeners)) { + return; + } + + Throwable err = null; + // Need to go over the hierarchy (session, factory managed, connection service, etc...) + for (PortForwardingEventListener l : listeners) { + if (l == null) { + continue; + } + + try { + invoker.invoke(l); + } catch (Throwable t) { + Throwable e = GenericUtils.peelException(t); + err = GenericUtils.accumulateException(err, e); + } + } + + if (err != null) { + throw err; + } + } + + protected void invokePortEventListenerSignallerHolders( + Collection<? extends PortForwardingEventListenerManager> holders, Invoker<PortForwardingEventListener, Void> invoker) + throws Throwable { + if (GenericUtils.isEmpty(holders)) { + return; + } + + Throwable err = null; + // Need to go over the hierarchy (session, factory managed, connection service, etc...) + for (PortForwardingEventListenerManager m : holders) { + try { + PortForwardingEventListener listener = m.getPortForwardingEventListenerProxy(); + if (listener != null) { + invoker.invoke(listener); + } + } catch (Throwable t) { + Throwable e = GenericUtils.peelException(t); + err = GenericUtils.accumulateException(err, e); + } + + if (m instanceof PortForwardingEventListenerManagerHolder) { + try { + invokePortEventListenerSignallerHolders(((PortForwardingEventListenerManagerHolder) m).getRegisteredManagers(), invoker); + } catch (Throwable t) { + Throwable e = GenericUtils.peelException(t); + err = GenericUtils.accumulateException(err, e); + } + } + } + + if (err != null) { + throw err; + } + } + @Override protected synchronized Closeable getInnerCloseable() { return builder().parallel(dynamicLocal.values()).close(acceptor).build(); @@ -502,6 +869,7 @@ public class DefaultTcpipForwarder @Override protected void preClose() { this.listeners.clear(); + this.managersHolder.clear(); super.preClose(); } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarderFactory.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarderFactory.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarderFactory.java index b100524..808f41d 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarderFactory.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarderFactory.java @@ -19,7 +19,7 @@ package org.apache.sshd.common.forward; import java.util.Collection; -import java.util.Objects; +import java.util.concurrent.CopyOnWriteArraySet; import org.apache.sshd.common.session.ConnectionService; import org.apache.sshd.common.util.EventListenerUtils; @@ -40,10 +40,14 @@ public class DefaultTcpipForwarderFactory implements TcpipForwarderFactory, Port public void removePortForwardingEventListener(PortForwardingEventListener listener) { throw new UnsupportedOperationException("removePortForwardingEventListener(" + listener + ") N/A on default instance"); } + + @Override + public PortForwardingEventListener getPortForwardingEventListenerProxy() { + return PortForwardingEventListener.EMPTY; + } }; - private final Collection<PortForwardingEventListener> listeners = - EventListenerUtils.synchronizedListenersSet(); + private final Collection<PortForwardingEventListener> listeners = new CopyOnWriteArraySet<>(); private final PortForwardingEventListener listenerProxy; public DefaultTcpipForwarderFactory() { @@ -57,7 +61,7 @@ public class DefaultTcpipForwarderFactory implements TcpipForwarderFactory, Port @Override public void addPortForwardingEventListener(PortForwardingEventListener listener) { - listeners.add(Objects.requireNonNull(listener, "No listener to add")); + listeners.add(PortForwardingEventListener.validateListener(listener)); } @Override @@ -66,13 +70,13 @@ public class DefaultTcpipForwarderFactory implements TcpipForwarderFactory, Port return; } - listeners.remove(listener); + listeners.remove(PortForwardingEventListener.validateListener(listener)); } @Override public TcpipForwarder create(ConnectionService service) { TcpipForwarder forwarder = new DefaultTcpipForwarder(service); - forwarder.addPortForwardingEventListener(getPortForwardingEventListenerProxy()); + forwarder.addPortForwardingEventListenerManager(this); return forwarder; } } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/common/forward/PortForwardingEventListener.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/PortForwardingEventListener.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/PortForwardingEventListener.java index 857fbfe..fde2d6d 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/forward/PortForwardingEventListener.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/PortForwardingEventListener.java @@ -154,4 +154,8 @@ public interface PortForwardingEventListener extends SshdEventListener { default void tornDownDynamicTunnel(Session session, SshdSocketAddress address, Throwable reason) throws IOException { // ignored } + + static <L extends PortForwardingEventListener> L validateListener(L listener) { + return SshdEventListener.validateListener(listener, PortForwardingEventListener.class.getSimpleName()); + } } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/common/forward/PortForwardingEventListenerManagerHolder.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/PortForwardingEventListenerManagerHolder.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/PortForwardingEventListenerManagerHolder.java new file mode 100644 index 0000000..71a607b --- /dev/null +++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/PortForwardingEventListenerManagerHolder.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sshd.common.forward; + +import java.util.Collection; + +/** + * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> + */ +public interface PortForwardingEventListenerManagerHolder { + /** + * @return The currently registered managers. <B>Note:</B> it is highly + * recommended that implementors return either an un-modifiable collection + * or a <U>copy</U> of the current one. Callers, should avoid modifying + * the retrieved value. + */ + Collection<PortForwardingEventListenerManager> getRegisteredManagers(); + + boolean addPortForwardingEventListenerManager(PortForwardingEventListenerManager manager); + boolean removePortForwardingEventListenerManager(PortForwardingEventListenerManager manager); +} http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarder.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarder.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarder.java index 07f4405..5e3d9ce 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarder.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarder.java @@ -24,7 +24,11 @@ import java.io.IOException; import org.apache.sshd.common.Closeable; import org.apache.sshd.common.util.net.SshdSocketAddress; -public interface TcpipForwarder extends PortForwardingManager, PortForwardingEventListenerManager, Closeable { +public interface TcpipForwarder + extends PortForwardingManager, + PortForwardingEventListenerManager, + PortForwardingEventListenerManagerHolder, + Closeable { /** * @param remotePort The remote port * @return The local {@link SshdSocketAddress} that the remote port is forwarded to http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/common/helpers/AbstractFactoryManager.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/helpers/AbstractFactoryManager.java b/sshd-core/src/main/java/org/apache/sshd/common/helpers/AbstractFactoryManager.java index 0b4802d..03adb22 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/helpers/AbstractFactoryManager.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/helpers/AbstractFactoryManager.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -76,14 +77,11 @@ public abstract class AbstractFactoryManager extends AbstractKexFactoryManager i protected List<RequestHandler<ConnectionService>> globalRequestHandlers; protected SessionTimeoutListener sessionTimeoutListener; protected ScheduledFuture<?> timeoutListenerFuture; - protected final Collection<SessionListener> sessionListeners = - EventListenerUtils.synchronizedListenersSet(); + protected final Collection<SessionListener> sessionListeners = new CopyOnWriteArraySet<>(); protected final SessionListener sessionListenerProxy; - protected final Collection<ChannelListener> channelListeners = - EventListenerUtils.synchronizedListenersSet(); + protected final Collection<ChannelListener> channelListeners = new CopyOnWriteArraySet<>(); protected final ChannelListener channelListenerProxy; - protected final Collection<PortForwardingEventListener> tunnelListeners = - EventListenerUtils.synchronizedListenersSet(); + protected final Collection<PortForwardingEventListener> tunnelListeners = new CopyOnWriteArraySet<>(); protected final PortForwardingEventListener tunnelListenerProxy; private final Map<String, Object> properties = new ConcurrentHashMap<>(); @@ -275,7 +273,8 @@ public abstract class AbstractFactoryManager extends AbstractKexFactoryManager i @Override public void addSessionListener(SessionListener listener) { - ValidateUtils.checkNotNull(listener, "addSessionListener(%s) null instance", this); + SessionListener.validateListener(listener); + // avoid race conditions on notifications while manager is being closed if (!isOpen()) { log.warn("addSessionListener({})[{}] ignore registration while manager is closing", this, listener); @@ -295,6 +294,12 @@ public abstract class AbstractFactoryManager extends AbstractKexFactoryManager i @Override public void removeSessionListener(SessionListener listener) { + if (listener == null) { + return; + } + + SessionListener.validateListener(listener); + if (this.sessionListeners.remove(listener)) { if (log.isTraceEnabled()) { log.trace("removeSessionListener({})[{}] removed", this, listener); @@ -313,7 +318,8 @@ public abstract class AbstractFactoryManager extends AbstractKexFactoryManager i @Override public void addChannelListener(ChannelListener listener) { - ValidateUtils.checkNotNull(listener, "addChannelListener(%s) null instance", this); + ChannelListener.validateListener(listener); + // avoid race conditions on notifications while manager is being closed if (!isOpen()) { log.warn("addChannelListener({})[{}] ignore registration while session is closing", this, listener); @@ -333,6 +339,11 @@ public abstract class AbstractFactoryManager extends AbstractKexFactoryManager i @Override public void removeChannelListener(ChannelListener listener) { + if (listener == null) { + return; + } + + ChannelListener.validateListener(listener); if (this.channelListeners.remove(listener)) { if (log.isTraceEnabled()) { log.trace("removeChannelListener({})[{}] removed", this, listener); @@ -356,7 +367,8 @@ public abstract class AbstractFactoryManager extends AbstractKexFactoryManager i @Override public void addPortForwardingEventListener(PortForwardingEventListener listener) { - ValidateUtils.checkNotNull(listener, "addChannelListener(%s) null instance", this); + PortForwardingEventListener.validateListener(listener); + // avoid race conditions on notifications while session is being closed if (!isOpen()) { log.warn("addPortForwardingEventListener({})[{}] ignore registration while session is closing", this, listener); @@ -380,6 +392,7 @@ public abstract class AbstractFactoryManager extends AbstractKexFactoryManager i return; } + PortForwardingEventListener.validateListener(listener); if (this.tunnelListeners.remove(listener)) { if (log.isTraceEnabled()) { log.trace("removePortForwardingEventListener({})[{}] removed", this, listener); http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/common/scp/ScpTransferEventListener.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/scp/ScpTransferEventListener.java b/sshd-core/src/main/java/org/apache/sshd/common/scp/ScpTransferEventListener.java index 3acae88..d7954e0 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/scp/ScpTransferEventListener.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/scp/ScpTransferEventListener.java @@ -98,4 +98,8 @@ public interface ScpTransferEventListener extends SshdEventListener { throws IOException { // ignored } + + static <L extends ScpTransferEventListener> L validateListener(L listener) { + return SshdEventListener.validateListener(listener, ScpTransferEventListener.class.getSimpleName()); + } } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractConnectionServiceFactory.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractConnectionServiceFactory.java b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractConnectionServiceFactory.java index 779e215..1d80778 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractConnectionServiceFactory.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractConnectionServiceFactory.java @@ -21,6 +21,7 @@ package org.apache.sshd.common.session; import java.util.Collection; import java.util.Objects; +import java.util.concurrent.CopyOnWriteArraySet; import org.apache.sshd.common.forward.PortForwardingEventListener; import org.apache.sshd.common.forward.PortForwardingEventListenerManager; @@ -31,8 +32,7 @@ import org.apache.sshd.common.util.logging.AbstractLoggingBean; * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> */ public abstract class AbstractConnectionServiceFactory extends AbstractLoggingBean implements PortForwardingEventListenerManager { - private final Collection<PortForwardingEventListener> listeners = - EventListenerUtils.synchronizedListenersSet(); + private final Collection<PortForwardingEventListener> listeners = new CopyOnWriteArraySet<>(); private final PortForwardingEventListener listenerProxy; protected AbstractConnectionServiceFactory() { http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/common/session/ConnectionService.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/ConnectionService.java b/sshd-core/src/main/java/org/apache/sshd/common/session/ConnectionService.java index a07046e..9795e5c 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/session/ConnectionService.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/session/ConnectionService.java @@ -24,6 +24,7 @@ import org.apache.sshd.agent.common.AgentForwardSupport; import org.apache.sshd.common.Service; import org.apache.sshd.common.channel.Channel; import org.apache.sshd.common.forward.PortForwardingEventListenerManager; +import org.apache.sshd.common.forward.PortForwardingEventListenerManagerHolder; import org.apache.sshd.common.forward.TcpipForwarder; import org.apache.sshd.server.x11.X11ForwardSupport; @@ -32,7 +33,7 @@ import org.apache.sshd.server.x11.X11ForwardSupport; * * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> */ -public interface ConnectionService extends Service, PortForwardingEventListenerManager { +public interface ConnectionService extends Service, PortForwardingEventListenerManager, PortForwardingEventListenerManagerHolder { /** * Register a newly created channel with a new unique identifier * http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/common/session/SessionListener.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/SessionListener.java b/sshd-core/src/main/java/org/apache/sshd/common/session/SessionListener.java index 8466ee7..f3280dd 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/session/SessionListener.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/session/SessionListener.java @@ -101,4 +101,8 @@ public interface SessionListener extends SshdEventListener { default void sessionClosed(Session session) { // ignored } + + static <L extends SessionListener> L validateListener(L listener) { + return SshdEventListener.validateListener(listener, SessionListener.class.getSimpleName()); + } } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java index 368045e..60f98e8 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java @@ -19,11 +19,14 @@ package org.apache.sshd.common.session.helpers; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -38,12 +41,13 @@ import org.apache.sshd.common.NamedFactory; import org.apache.sshd.common.PropertyResolverUtils; import org.apache.sshd.common.SshConstants; import org.apache.sshd.common.SshException; +import org.apache.sshd.common.channel.AbstractChannel; import org.apache.sshd.common.channel.Channel; -import org.apache.sshd.common.channel.ChannelListener; import org.apache.sshd.common.channel.OpenChannelException; import org.apache.sshd.common.channel.RequestHandler; import org.apache.sshd.common.channel.Window; import org.apache.sshd.common.forward.PortForwardingEventListener; +import org.apache.sshd.common.forward.PortForwardingEventListenerManager; import org.apache.sshd.common.forward.TcpipForwarder; import org.apache.sshd.common.forward.TcpipForwarderFactory; import org.apache.sshd.common.io.AbstractIoWriteFuture; @@ -98,10 +102,9 @@ public abstract class AbstractConnectionService<S extends AbstractSession> private final AtomicReference<X11ForwardSupport> x11ForwardHolder = new AtomicReference<>(); private final AtomicReference<TcpipForwarder> tcpipForwarderHolder = new AtomicReference<>(); private final AtomicBoolean allowMoreSessions = new AtomicBoolean(true); - private final Collection<PortForwardingEventListener> listeners = - EventListenerUtils.synchronizedListenersSet(); + private final Collection<PortForwardingEventListener> listeners = new CopyOnWriteArraySet<>(); + private final Collection<PortForwardingEventListenerManager> managersHolder = new CopyOnWriteArraySet<>(); private final PortForwardingEventListener listenerProxy; - private final S sessionInstance; protected AbstractConnectionService(S session) { @@ -116,7 +119,7 @@ public abstract class AbstractConnectionService<S extends AbstractSession> @Override public void addPortForwardingEventListener(PortForwardingEventListener listener) { - listeners.add(Objects.requireNonNull(listener, "No listener to add")); + listeners.add(PortForwardingEventListener.validateListener(listener)); } @Override @@ -125,7 +128,26 @@ public abstract class AbstractConnectionService<S extends AbstractSession> return; } - listeners.remove(listener); + listeners.remove(PortForwardingEventListener.validateListener(listener)); + } + + @Override + public Collection<PortForwardingEventListenerManager> getRegisteredManagers() { + return managersHolder.isEmpty() ? Collections.emptyList() : new ArrayList<>(managersHolder); + } + + @Override + public boolean addPortForwardingEventListenerManager(PortForwardingEventListenerManager manager) { + return managersHolder.add(Objects.requireNonNull(manager, "No manager")); + } + + @Override + public boolean removePortForwardingEventListenerManager(PortForwardingEventListenerManager manager) { + if (manager == null) { + return false; + } + + return managersHolder.remove(manager); } public Collection<Channel> getChannels() { @@ -165,6 +187,7 @@ public abstract class AbstractConnectionService<S extends AbstractSession> @Override protected void preClose() { this.listeners.clear(); + this.managersHolder.clear(); super.preClose(); } @@ -174,8 +197,7 @@ public abstract class AbstractConnectionService<S extends AbstractSession> TcpipForwarderFactory factory = Objects.requireNonNull(manager.getTcpipForwarderFactory(), "No forwarder factory"); TcpipForwarder forwarder = factory.create(this); - forwarder.addPortForwardingEventListener(getPortForwardingEventListenerProxy()); - forwarder.addPortForwardingEventListener(session.getPortForwardingEventListenerProxy()); + forwarder.addPortForwardingEventListenerManager(this); return forwarder; } @@ -272,26 +294,9 @@ public abstract class AbstractConnectionService<S extends AbstractSession> protected void handleChannelRegistrationFailure(Channel channel, int channelId) throws IOException { RuntimeException reason = new IllegalStateException("Channel id=" + channelId + " not registered because session is being closed: " + this); - ChannelListener listener = channel.getChannelListenerProxy(); - try { - listener.channelClosed(channel, reason); - } catch (Throwable err) { - Throwable ignored = GenericUtils.peelException(err); - log.warn("registerChannel({})[{}] failed ({}) to inform of channel closure: {}", - this, channel, ignored.getClass().getSimpleName(), ignored.getMessage()); - if (log.isDebugEnabled()) { - log.debug("registerChannel(" + this + ")[" + channel + "] inform closure failure details", ignored); - } - if (log.isTraceEnabled()) { - Throwable[] suppressed = ignored.getSuppressed(); - if (GenericUtils.length(suppressed) > 0) { - for (Throwable s : suppressed) { - log.trace("registerChannel(" + this + ")[" + channel + "] suppressed channel closed signalling", s); - } - } - } - } - + AbstractChannel notifier = + ValidateUtils.checkInstanceOf(channel, AbstractChannel.class, "Non abstract channel for id=%d", channelId); + notifier.signalChannelClosed(reason); throw reason; }