[SSHD-706] Re-factored the code that invokes the various listeners to avoid 
using proxies


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/38c84a83
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/38c84a83
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/38c84a83

Branch: refs/heads/master
Commit: 38c84a830e5078b62912cb8131f83e8ac82b2742
Parents: 6c92dcc
Author: Lyor Goldstein <lyor.goldst...@gmail.com>
Authored: Sun Nov 6 18:34:00 2016 +0200
Committer: Lyor Goldstein <lyor.goldst...@gmail.com>
Committed: Sun Nov 6 18:34:00 2016 +0200

----------------------------------------------------------------------
 .../agent/local/ChannelAgentForwarding.java     |  30 +-
 .../sshd/agent/unix/ChannelAgentForwarding.java |  25 +-
 .../client/channel/AbstractClientChannel.java   |  24 +-
 .../session/ClientConnectionServiceFactory.java |   7 +-
 .../sshd/client/session/ClientSessionImpl.java  |  26 +-
 .../sshd/common/channel/AbstractChannel.java    | 193 ++++++--
 .../sshd/common/channel/ChannelListener.java    |   5 +
 .../common/forward/DefaultTcpipForwarder.java   | 440 +++++++++++++++++--
 .../forward/DefaultTcpipForwarderFactory.java   |  16 +-
 .../forward/PortForwardingEventListener.java    |   4 +
 ...ortForwardingEventListenerManagerHolder.java |  38 ++
 .../sshd/common/forward/TcpipForwarder.java     |   6 +-
 .../common/helpers/AbstractFactoryManager.java  |  31 +-
 .../common/scp/ScpTransferEventListener.java    |   4 +
 .../AbstractConnectionServiceFactory.java       |   4 +-
 .../sshd/common/session/ConnectionService.java  |   3 +-
 .../sshd/common/session/SessionListener.java    |   4 +
 .../helpers/AbstractConnectionService.java      |  61 +--
 .../common/session/helpers/AbstractSession.java | 270 +++++++++---
 .../sshd/common/util/EventListenerUtils.java    |  21 +-
 .../org/apache/sshd/common/util/Invoker.java    | 113 +++++
 .../sshd/common/util/SshdEventListener.java     |  17 +-
 .../org/apache/sshd/server/SignalListener.java  |   4 +
 .../apache/sshd/server/StandardEnvironment.java |  13 +-
 .../server/channel/AbstractServerChannel.java   |  27 +-
 .../sshd/server/forward/TcpipServerChannel.java |  71 +--
 .../sshd/server/scp/ScpCommandFactory.java      |   6 +-
 .../session/ServerConnectionServiceFactory.java |   7 +-
 .../sshd/server/session/ServerSessionImpl.java  |  27 +-
 .../sftp/AbstractSftpEventListenerManager.java  |  13 +-
 .../subsystem/sftp/SftpEventListener.java       |   4 +
 .../server/subsystem/sftp/SftpSubsystem.java    |  12 +-
 .../test/java/org/apache/sshd/ProxyTest.java    |  10 +
 33 files changed, 1136 insertions(+), 400 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java
 
b/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java
index d053a31..7aeb35b 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java
@@ -29,7 +29,6 @@ import org.apache.sshd.client.future.DefaultOpenFuture;
 import org.apache.sshd.client.future.OpenFuture;
 import org.apache.sshd.common.FactoryManager;
 import org.apache.sshd.common.SshConstants;
-import org.apache.sshd.common.channel.ChannelListener;
 import org.apache.sshd.common.channel.ChannelOutputStream;
 import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.session.Session;
@@ -53,8 +52,8 @@ public class ChannelAgentForwarding extends 
AbstractServerChannel {
 
     @Override
     protected OpenFuture doInit(Buffer buffer) {
-        final OpenFuture f = new DefaultOpenFuture(this);
-        ChannelListener listener = getChannelListenerProxy();
+        OpenFuture f = new DefaultOpenFuture(this);
+        String changeEvent = "auth-agent";
         try {
             out = new ChannelOutputStream(this, getRemoteWindow(), log, 
SshConstants.SSH_MSG_CHANNEL_DATA, true);
 
@@ -64,30 +63,15 @@ public class ChannelAgentForwarding extends 
AbstractServerChannel {
             agent = factory.createClient(manager);
             client = new AgentClient();
 
-            listener.channelOpenSuccess(this);
+            signalChannelOpenSuccess();
             f.setOpened();
         } catch (Throwable t) {
             Throwable e = GenericUtils.peelException(t);
-            try {
-                listener.channelOpenFailure(this, e);
-            } catch (Throwable err) {
-                Throwable ignored = GenericUtils.peelException(err);
-                log.warn("doInit({}) failed ({}) to inform listener of open 
failure={}: {}",
-                         this, ignored.getClass().getSimpleName(), 
e.getClass().getSimpleName(), ignored.getMessage());
-                if (log.isDebugEnabled()) {
-                    log.debug("doInit(" + this + ") inform listener open 
failure details", ignored);
-                }
-
-                if (log.isTraceEnabled()) {
-                    Throwable[] suppressed = ignored.getSuppressed();
-                    if (GenericUtils.length(suppressed) > 0) {
-                        for (Throwable s : suppressed) {
-                            log.trace("doInit(" + this + ") suppressed channel 
open failure signalling", s);
-                        }
-                    }
-                }
-            }
+            changeEvent = e.getClass().getSimpleName();
+            signalChannelOpenFailure(e);
             f.setException(e);
+        } finally {
+            notifyStateChanged(changeEvent);
         }
         return f;
     }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java
 
b/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java
index ecafc34..42bc3d0 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java
@@ -29,7 +29,6 @@ import org.apache.sshd.client.future.DefaultOpenFuture;
 import org.apache.sshd.client.future.OpenFuture;
 import org.apache.sshd.common.PropertyResolverUtils;
 import org.apache.sshd.common.SshConstants;
-import org.apache.sshd.common.channel.ChannelListener;
 import org.apache.sshd.common.channel.ChannelOutputStream;
 import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.util.GenericUtils;
@@ -73,8 +72,7 @@ public class ChannelAgentForwarding extends 
AbstractServerChannel {
 
     @Override
     protected OpenFuture doInit(Buffer buffer) {
-        final OpenFuture f = new DefaultOpenFuture(this);
-        ChannelListener listener = getChannelListenerProxy();
+        OpenFuture f = new DefaultOpenFuture(this);
         try {
             out = new ChannelOutputStream(this, getRemoteWindow(), log, 
SshConstants.SSH_MSG_CHANNEL_DATA, true);
             authSocket = PropertyResolverUtils.getString(this, 
SshAgent.SSH_AUTHSOCKET_ENV_NAME);
@@ -108,28 +106,11 @@ public class ChannelAgentForwarding extends 
AbstractServerChannel {
                 }
             });
 
-            listener.channelOpenSuccess(this);
+            signalChannelOpenSuccess();
             f.setOpened();
         } catch (Throwable t) {
             Throwable e = GenericUtils.peelException(t);
-            try {
-                listener.channelOpenFailure(this, e);
-            } catch (Throwable err) {
-                Throwable ignored = GenericUtils.peelException(err);
-                log.warn("doInit({}) failed ({}) to inform listener of open 
failure={}: {}",
-                         this, ignored.getClass().getSimpleName(), 
e.getClass().getSimpleName(), ignored.getMessage());
-                if (log.isDebugEnabled()) {
-                    log.debug("doInit(" + this + ") inform listener open 
failure details", ignored);
-                }
-                if (log.isTraceEnabled()) {
-                    Throwable[] suppressed = ignored.getSuppressed();
-                    if (GenericUtils.length(suppressed) > 0) {
-                        for (Throwable s : suppressed) {
-                            log.trace("doInit(" + this + ") suppressed channel 
open failure signalling", s);
-                        }
-                    }
-                }
-            }
+            signalChannelOpenFailure(e);
             f.setException(e);
         }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
 
b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
index 4c3aee3..6adf2f6 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
@@ -41,7 +41,6 @@ import org.apache.sshd.common.channel.AbstractChannel;
 import org.apache.sshd.common.channel.Channel;
 import org.apache.sshd.common.channel.ChannelAsyncInputStream;
 import org.apache.sshd.common.channel.ChannelAsyncOutputStream;
-import org.apache.sshd.common.channel.ChannelListener;
 import org.apache.sshd.common.channel.RequestHandler;
 import org.apache.sshd.common.channel.Window;
 import org.apache.sshd.common.io.IoInputStream;
@@ -328,36 +327,17 @@ public abstract class AbstractClientChannel extends 
AbstractChannel implements C
         Window wRemote = getRemoteWindow();
         wRemote.init(rwSize, packetSize, manager.getProperties());
 
-        ChannelListener listener = getChannelListenerProxy();
         String changeEvent = "SSH_MSG_CHANNEL_OPEN_CONFIRMATION";
         try {
             doOpen();
 
-            listener.channelOpenSuccess(this);
+            signalChannelOpenSuccess();
             this.opened.set(true);
             this.openFuture.setOpened();
         } catch (Throwable t) {
             Throwable e = GenericUtils.peelException(t);
             changeEvent = e.getClass().getName();
-            try {
-                listener.channelOpenFailure(this, e);
-            } catch (Throwable err) {
-                Throwable ignored = GenericUtils.peelException(err);
-                log.warn("handleOpenSuccess({}) failed ({}) to inform listener 
of open failure={}: {}",
-                         this, ignored.getClass().getSimpleName(), 
e.getClass().getSimpleName(), ignored.getMessage());
-                if (log.isDebugEnabled()) {
-                    log.debug("handleOpenSuccess(" + this + ") inform listener 
open failure details", ignored);
-                }
-                if (log.isTraceEnabled()) {
-                    Throwable[] suppressed = ignored.getSuppressed();
-                    if (GenericUtils.length(suppressed) > 0) {
-                        for (Throwable s : suppressed) {
-                            log.trace("handleOpenSuccess(" + this + ") 
suppressed channel open failure signalling", s);
-                        }
-                    }
-                }
-            }
-
+            signalChannelOpenFailure(e);
             this.openFuture.setException(e);
             this.closeFuture.setClosed();
             this.doCloseImmediately();

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionServiceFactory.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionServiceFactory.java
 
b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionServiceFactory.java
index 9a7505a..d8f90d0 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionServiceFactory.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionServiceFactory.java
@@ -41,6 +41,11 @@ public class ClientConnectionServiceFactory extends 
AbstractConnectionServiceFac
         public void 
removePortForwardingEventListener(PortForwardingEventListener listener) {
             throw new 
UnsupportedOperationException("removePortForwardingEventListener(" + listener + 
") N/A on default instance");
         }
+
+        @Override
+        public PortForwardingEventListener 
getPortForwardingEventListenerProxy() {
+            return PortForwardingEventListener.EMPTY;
+        }
     };
 
     public ClientConnectionServiceFactory() {
@@ -57,7 +62,7 @@ public class ClientConnectionServiceFactory extends 
AbstractConnectionServiceFac
         AbstractClientSession abstractSession =
                 ValidateUtils.checkInstanceOf(session, 
AbstractClientSession.class, "Not a client session: %s", session);
         ClientConnectionService service = new 
ClientConnectionService(abstractSession);
-        
service.addPortForwardingEventListener(getPortForwardingEventListenerProxy());
+        service.addPortForwardingEventListenerManager(this);
         return service;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java 
b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java
index 28a72ad..4ae713f 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java
@@ -32,7 +32,6 @@ import java.util.Set;
 import org.apache.sshd.client.ClientFactoryManager;
 import org.apache.sshd.client.future.AuthFuture;
 import org.apache.sshd.client.future.DefaultAuthFuture;
-import org.apache.sshd.common.RuntimeSshException;
 import org.apache.sshd.common.Service;
 import org.apache.sshd.common.ServiceFactory;
 import org.apache.sshd.common.SshConstants;
@@ -87,26 +86,7 @@ public class ClientSessionImpl extends AbstractClientSession 
{
         authFuture = new DefaultAuthFuture(lock);
         authFuture.setAuthed(false);
 
-        // Inform the listener of the newly created session
-        SessionListener listener = getSessionListenerProxy();
-        try {
-            listener.sessionCreated(this);
-        } catch (Throwable t) {
-            Throwable e = GenericUtils.peelException(t);
-            if (log.isDebugEnabled()) {
-                log.debug("Failed ({}) to announce session={} created: {}",
-                          e.getClass().getSimpleName(), ioSession, 
e.getMessage());
-            }
-            if (log.isTraceEnabled()) {
-                log.trace("Session=" + ioSession + " creation failure 
details", e);
-            }
-            if (e instanceof Exception) {
-                throw (Exception) e;
-            } else {
-                throw new RuntimeSshException(e);
-            }
-        }
-
+        signalSessionCreated(ioSession);
         sendClientIdentification();
         kexState.set(KexState.INIT);
         sendKexInit();
@@ -188,14 +168,14 @@ public class ClientSessionImpl extends 
AbstractClientSession {
     }
 
     @Override
-    protected void sendSessionEvent(SessionListener.Event event) throws 
IOException {
+    protected void signalSessionEvent(SessionListener.Event event) throws 
IOException {
         if (SessionListener.Event.KeyEstablished.equals(event)) {
             sendInitialServiceRequest();
         }
         synchronized (lock) {
             lock.notifyAll();
         }
-        super.sendSessionEvent(event);
+        super.signalSessionEvent(event);
     }
 
     protected void sendInitialServiceRequest() throws IOException {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
index b4810b2..71cb861 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -50,6 +51,7 @@ import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.util.EventListenerUtils;
 import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.Int2IntFunction;
+import org.apache.sshd.common.util.Invoker;
 import org.apache.sshd.common.util.ValidateUtils;
 import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.common.util.buffer.BufferUtils;
@@ -85,8 +87,7 @@ public abstract class AbstractChannel
     /**
      * Channel events listener
      */
-    protected final Collection<ChannelListener> channelListeners =
-            EventListenerUtils.synchronizedListenersSet();
+    protected final Collection<ChannelListener> channelListeners = new 
CopyOnWriteArraySet<>();
     protected final ChannelListener channelListenerProxy;
 
     private int id = -1;
@@ -352,11 +353,19 @@ public abstract class AbstractChannel
         this.sessionInstance = session;
         this.id = id;
 
-        ChannelListener listener = session.getChannelListenerProxy();
+        signalChannelInitialized();
+        configureWindow();
+        initialized.set(true);
+    }
+
+    protected void signalChannelInitialized() throws IOException {
         try {
-            listener.channelInitialized(this);
-        } catch (Throwable t) {
-            Throwable e = GenericUtils.peelException(t);
+            invokeChannelSignaller(l -> {
+                signalChannelInitialized(l);
+                return null;
+            });
+        } catch (Throwable err) {
+            Throwable e = GenericUtils.peelException(err);
             if (e instanceof IOException) {
                 throw (IOException) e;
             } else if (e instanceof RuntimeException) {
@@ -365,10 +374,39 @@ public abstract class AbstractChannel
                 throw new IOException("Failed (" + 
e.getClass().getSimpleName() + ") to notify channel " + this + " 
initialization: " + e.getMessage(), e);
             }
         }
-        // delegate the rest of the notifications to the channel
-        addChannelListener(listener);
-        configureWindow();
-        initialized.set(true);
+    }
+
+    protected void signalChannelInitialized(ChannelListener listener) {
+        if (listener == null) {
+            return;
+        }
+
+        listener.channelInitialized(this);
+    }
+
+    protected void signalChannelOpenSuccess() {
+        try {
+            invokeChannelSignaller(l -> {
+                signalChannelOpenSuccess(l);
+                return null;
+            });
+        } catch (Throwable err) {
+            if (err instanceof RuntimeException) {
+                throw (RuntimeException) err;
+            } else if (err instanceof Error) {
+                throw (Error) err;
+            } else {
+                throw new RuntimeException(err);
+            }
+        }
+    }
+
+    protected void signalChannelOpenSuccess(ChannelListener listener) {
+        if (listener == null) {
+            return;
+        }
+
+        listener.channelOpenSuccess(this);
     }
 
     @Override
@@ -376,27 +414,70 @@ public abstract class AbstractChannel
         return initialized.get();
     }
 
+    protected void signalChannelOpenFailure(Throwable reason) {
+        try {
+            invokeChannelSignaller(l -> {
+                signalChannelOpenFailure(l, reason);
+                return null;
+            });
+        } catch (Throwable err) {
+            Throwable ignored = GenericUtils.peelException(err);
+            log.warn("signalChannelOpenFailure({}) failed ({}) to inform 
listener of open failure={}: {}",
+                     this, ignored.getClass().getSimpleName(), 
reason.getClass().getSimpleName(), ignored.getMessage());
+            if (log.isDebugEnabled()) {
+                log.debug("doInit(" + this + ") inform listener open failure 
details", ignored);
+            }
+
+            if (log.isTraceEnabled()) {
+                Throwable[] suppressed = ignored.getSuppressed();
+                if (GenericUtils.length(suppressed) > 0) {
+                    for (Throwable s : suppressed) {
+                        log.trace("signalChannelOpenFailure(" + this + ") 
suppressed channel open failure signalling", s);
+                    }
+                }
+            }
+        }
+    }
+
+    protected void signalChannelOpenFailure(ChannelListener listener, 
Throwable reason) {
+        if (listener == null) {
+            return;
+        }
+
+        listener.channelOpenFailure(this, reason);
+    }
+
     protected void notifyStateChanged(String hint) {
-        ChannelListener listener = getChannelListenerProxy();
         try {
-            listener.channelStateChanged(this, hint);
-        } catch (Throwable t) {
-            Throwable e = GenericUtils.peelException(t);
+            invokeChannelSignaller(l -> {
+                notifyStateChanged(l, hint);
+                return null;
+            });
+        } catch (Throwable err) {
+            Throwable e = GenericUtils.peelException(err);
             log.warn("notifyStateChanged({})[{}] {} while signal channel state 
change: {}",
                      this, hint, e.getClass().getSimpleName(), e.getMessage());
             if (log.isDebugEnabled()) {
                 log.debug("notifyStateChanged(" + this + ")[" + hint + "] 
channel state signalling failure details", e);
             }
+        } finally {
+            synchronized (lock) {
+                lock.notifyAll();
+            }
         }
+    }
 
-        synchronized (lock) {
-            lock.notifyAll();
+    protected void notifyStateChanged(ChannelListener listener, String hint) {
+        if (listener == null) {
+            return;
         }
+
+        listener.channelStateChanged(this, hint);
     }
 
     @Override
     public void addChannelListener(ChannelListener listener) {
-        ValidateUtils.checkNotNull(listener, "addChannelListener(%s) null 
instance", this);
+        ChannelListener.validateListener(listener);
         // avoid race conditions on notifications while channel is being closed
         if (!isOpen()) {
             log.warn("addChannelListener({})[{}] ignore registration while 
channel is closing", this, listener);
@@ -416,6 +497,11 @@ public abstract class AbstractChannel
 
     @Override
     public void removeChannelListener(ChannelListener listener) {
+        if (listener == null) {
+            return;
+        }
+
+        ChannelListener.validateListener(listener);
         if (this.channelListeners.remove(listener)) {
             if (log.isTraceEnabled()) {
                 log.trace("removeChannelListener({})[{}] removed", this, 
listener);
@@ -583,23 +669,8 @@ public abstract class AbstractChannel
 
     @Override
     protected void preClose() {
-        ChannelListener listener = getChannelListenerProxy();
         try {
-            listener.channelClosed(this, null);
-        } catch (Throwable t) {
-            Throwable e = GenericUtils.peelException(t);
-            log.warn("preClose({}) {} while signal channel closed: {}", this, 
e.getClass().getSimpleName(), e.getMessage());
-            if (log.isDebugEnabled()) {
-                log.debug("preClose(" + this + ") channel closed signalling 
failure details", e);
-            }
-            if (log.isTraceEnabled()) {
-                Throwable[] suppressed = e.getSuppressed();
-                if (GenericUtils.length(suppressed) > 0) {
-                    for (Throwable s : suppressed) {
-                        log.trace("preClose(" + this + ") suppressed closed 
channel signalling failure", s);
-                    }
-                }
-            }
+            signalChannelClosed(null);
         } finally {
             // clear the listeners since we are closing the channel (quicker 
GC)
             this.channelListeners.clear();
@@ -624,6 +695,62 @@ public abstract class AbstractChannel
         super.preClose();
     }
 
+    public void signalChannelClosed(Throwable reason) {
+        try {
+            invokeChannelSignaller(l -> {
+                signalChannelClosed(l, reason);
+                return null;
+            });
+        } catch (Throwable err) {
+            Throwable e = GenericUtils.peelException(err);
+            log.warn("signalChannelClosed({}) {} while signal channel closed: 
{}", this, e.getClass().getSimpleName(), e.getMessage());
+            if (log.isDebugEnabled()) {
+                log.debug("signalChannelClosed(" + this + ") channel closed 
signalling failure details", e);
+            }
+            if (log.isTraceEnabled()) {
+                Throwable[] suppressed = e.getSuppressed();
+                if (GenericUtils.length(suppressed) > 0) {
+                    for (Throwable s : suppressed) {
+                        log.trace("signalChannelClosed(" + this + ") 
suppressed closed channel signalling failure", s);
+                    }
+                }
+            }
+        }
+    }
+
+    protected void signalChannelClosed(ChannelListener listener, Throwable 
reason) {
+        if (listener == null) {
+            return;
+        }
+
+        listener.channelClosed(this, reason);
+    }
+
+    protected void invokeChannelSignaller(Invoker<ChannelListener, Void> 
invoker) throws Throwable {
+        Session session = getSession();
+        FactoryManager manager = (session == null) ? null : 
session.getFactoryManager();
+        ChannelListener[] listeners = {
+            (manager == null) ? null : manager.getChannelListenerProxy(),
+            (session == null) ? null : session.getChannelListenerProxy(),
+            getChannelListenerProxy()
+        };
+
+        Throwable err = null;
+        for (ChannelListener l : listeners) {
+            if (l == null) {
+                continue;
+            }
+            try {
+                invoker.invoke(l);
+            } catch (Throwable t) {
+                err = GenericUtils.accumulateException(err, t);
+            }
+        }
+
+        if (err != null) {
+            throw err;
+        }
+    }
     @Override
     protected void doCloseImmediately() {
         if (service != null) {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelListener.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelListener.java 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelListener.java
index 0124fca..ba0d72f 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelListener.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelListener.java
@@ -80,6 +80,7 @@ public interface ChannelListener extends SshdEventListener {
      * @param channel The {@link Channel} whose state has changed
      * @param hint A &quot;hint&quot; as to the nature of the state change.
      * it can be a request name or a {@code SSH_MSG_CHANNEL_XXX} command
+     * or the name of an exception class
      */
     default void channelStateChanged(Channel channel, String hint) {
         // ignored
@@ -100,4 +101,8 @@ public interface ChannelListener extends SshdEventListener {
     default void channelClosed(Channel channel, Throwable reason) {
         // ignored
     }
+
+    static <L extends ChannelListener> L validateListener(L listener) {
+        return SshdEventListener.validateListener(listener, 
ChannelListener.class.getSimpleName());
+    }
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
index 5462753..6288d41 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -30,6 +31,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.sshd.client.channel.ClientChannelEvent;
@@ -50,6 +52,7 @@ import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.session.SessionHolder;
 import org.apache.sshd.common.util.EventListenerUtils;
 import org.apache.sshd.common.util.GenericUtils;
+import org.apache.sshd.common.util.Invoker;
 import org.apache.sshd.common.util.Readable;
 import org.apache.sshd.common.util.ValidateUtils;
 import org.apache.sshd.common.util.buffer.Buffer;
@@ -91,8 +94,8 @@ public class DefaultTcpipForwarder
     private final Map<Integer, SocksProxy> dynamicLocal = new HashMap<>();
     private final Set<LocalForwardingEntry> localForwards = new HashSet<>();
     private final IoHandlerFactory staticIoHandlerFactory = 
StaticIoHandler::new;
-    private final Collection<PortForwardingEventListener> listeners =
-            EventListenerUtils.synchronizedListenersSet();
+    private final Collection<PortForwardingEventListener> listeners = new 
CopyOnWriteArraySet<>();
+    private final Collection<PortForwardingEventListenerManager> 
managersHolder = new CopyOnWriteArraySet<>();
     private final PortForwardingEventListener listenerProxy;
 
     private IoAcceptor acceptor;
@@ -110,7 +113,7 @@ public class DefaultTcpipForwarder
 
     @Override
     public void addPortForwardingEventListener(PortForwardingEventListener 
listener) {
-        listeners.add(Objects.requireNonNull(listener, "No listener to add"));
+        listeners.add(PortForwardingEventListener.validateListener(listener));
     }
 
     @Override
@@ -119,7 +122,26 @@ public class DefaultTcpipForwarder
             return;
         }
 
-        listeners.remove(listener);
+        
listeners.remove(PortForwardingEventListener.validateListener(listener));
+    }
+
+    @Override
+    public Collection<PortForwardingEventListenerManager> 
getRegisteredManagers() {
+        return managersHolder.isEmpty() ? Collections.emptyList() : new 
ArrayList<>(managersHolder);
+    }
+
+    @Override
+    public boolean 
addPortForwardingEventListenerManager(PortForwardingEventListenerManager 
manager) {
+        return managersHolder.add(Objects.requireNonNull(manager, "No 
manager"));
+    }
+
+    @Override
+    public boolean 
removePortForwardingEventListenerManager(PortForwardingEventListenerManager 
manager) {
+        if (manager == null) {
+            return false;
+        }
+
+        return managersHolder.remove(manager);
     }
 
     @Override
@@ -131,6 +153,25 @@ public class DefaultTcpipForwarder
         return service;
     }
 
+    protected Collection<PortForwardingEventListener> getDefaultListeners() {
+        Collection<PortForwardingEventListener> defaultListeners = new 
ArrayList<>();
+        defaultListeners.add(getPortForwardingEventListenerProxy());
+
+        Session session = getSession();
+        PortForwardingEventListener l = 
session.getPortForwardingEventListenerProxy();
+        if (l != null) {
+            defaultListeners.add(l);
+        }
+
+        FactoryManager manager = (session == null) ? null : 
session.getFactoryManager();
+        l = (manager == null) ? null : 
manager.getPortForwardingEventListenerProxy();
+        if (l != null) {
+            defaultListeners.add(l);
+        }
+
+        return defaultListeners;
+    }
+
     //
     // TcpIpForwarder implementation
     //
@@ -150,8 +191,7 @@ public class DefaultTcpipForwarder
 
         InetSocketAddress bound;
         int port;
-        PortForwardingEventListener listener = 
getPortForwardingEventListenerProxy();
-        listener.establishingExplicitTunnel(getSession(), local, remote, true);
+        signalEstablishingExplicitTunnel(local, remote, true);
         try {
             bound = doBind(local, staticIoHandlerFactory);
             port = bound.getPort();
@@ -169,7 +209,7 @@ public class DefaultTcpipForwarder
             } catch (IOException | RuntimeException err) {
                 e.addSuppressed(err);
             }
-            listener.establishedExplicitTunnel(getSession(), local, remote, 
true, null, e);
+            signalEstablishedExplicitTunnel(local, remote, true, null, e);
             throw e;
         }
 
@@ -178,7 +218,7 @@ public class DefaultTcpipForwarder
             if (log.isDebugEnabled()) {
                 log.debug("startLocalPortForwarding(" + local + " -> " + 
remote + "): " + result);
             }
-            listener.establishedExplicitTunnel(getSession(), local, remote, 
true, result, null);
+            signalEstablishedExplicitTunnel(local, remote, true, result, null);
             return result;
         } catch (IOException | RuntimeException e) {
             stopLocalPortForwarding(local);
@@ -200,16 +240,15 @@ public class DefaultTcpipForwarder
                 log.debug("stopLocalPortForwarding(" + local + ") unbind " + 
bound);
             }
 
-            PortForwardingEventListener listener = 
getPortForwardingEventListenerProxy();
-            listener.tearingDownExplicitTunnel(getSession(), bound, true);
+            signalTearingDownExplicitTunnel(bound, true);
             try {
                 acceptor.unbind(bound.toInetSocketAddress());
             } catch (RuntimeException e) {
-                listener.tornDownExplicitTunnel(getSession(), bound, true, e);
+                signalTornDownExplicitTunnel(bound, true, e);
                 throw e;
             }
 
-            listener.tornDownExplicitTunnel(getSession(), bound, true, null);
+            signalTornDownExplicitTunnel(bound, true, null);
         } else {
             if (log.isDebugEnabled()) {
                 log.debug("stopLocalPortForwarding(" + local + ") no 
mapping/acceptor for " + bound);
@@ -234,8 +273,7 @@ public class DefaultTcpipForwarder
         long timeout = PropertyResolverUtils.getLongProperty(session, 
FORWARD_REQUEST_TIMEOUT, DEFAULT_FORWARD_REQUEST_TIMEOUT);
         Buffer result;
         int port;
-        PortForwardingEventListener listener = 
getPortForwardingEventListenerProxy();
-        listener.establishingExplicitTunnel(getSession(), local, remote, 
false);
+        signalEstablishingExplicitTunnel(local, remote, false);
         try {
             result = session.request("tcpip-forward", buffer, timeout, 
TimeUnit.MILLISECONDS);
             if (result == null) {
@@ -257,7 +295,7 @@ public class DefaultTcpipForwarder
             } catch (IOException | RuntimeException err) {
                 e.addSuppressed(err);
             }
-            listener.establishedExplicitTunnel(session, local, remote, false, 
null, e);
+            signalEstablishedExplicitTunnel(local, remote, false, null, e);
             throw e;
         }
 
@@ -267,7 +305,7 @@ public class DefaultTcpipForwarder
                 log.debug("startRemotePortForwarding(" + remote + " -> " + 
local + "): " + bound);
             }
 
-            listener.establishedExplicitTunnel(getSession(), local, remote, 
false, bound, null);
+            signalEstablishedExplicitTunnel(local, remote, false, bound, null);
             return bound;
         } catch (IOException | RuntimeException e) {
             stopRemotePortForwarding(remote);
@@ -295,16 +333,15 @@ public class DefaultTcpipForwarder
             buffer.putString(remoteHost);
             buffer.putInt(remote.getPort());
 
-            PortForwardingEventListener listener = 
getPortForwardingEventListenerProxy();
-            listener.tearingDownExplicitTunnel(getSession(), bound, false);
+            signalTearingDownExplicitTunnel(bound, false);
             try {
                 session.writePacket(buffer);
             } catch (IOException | RuntimeException e) {
-                listener.tornDownExplicitTunnel(getSession(), bound, false, e);
+                signalTornDownExplicitTunnel(bound, false, e);
                 throw e;
             }
 
-            listener.tornDownExplicitTunnel(getSession(), bound, false, null);
+            signalTornDownExplicitTunnel(bound, false, null);
         } else {
             if (log.isDebugEnabled()) {
                 log.debug("stopRemotePortForwarding(" + remote + ") no binding 
found");
@@ -312,6 +349,68 @@ public class DefaultTcpipForwarder
         }
     }
 
+    protected void signalTearingDownExplicitTunnel(SshdSocketAddress 
boundAddress, boolean localForwarding) throws IOException {
+        try {
+            invokePortEventListenerSignaller(l -> {
+                signalTearingDownExplicitTunnel(l, boundAddress, 
localForwarding);
+                return null;
+            });
+        } catch (Throwable t) {
+            if (t instanceof RuntimeException) {
+                throw (RuntimeException) t;
+            } else if (t instanceof Error) {
+                throw (Error) t;
+            } else if (t instanceof IOException) {
+                throw (IOException) t;
+            } else {
+                throw new IOException("Failed (" + 
t.getClass().getSimpleName() + ")"
+                        + " to signal tearing down explicit tunnel for local=" 
+ localForwarding
+                        + " on bound=" + boundAddress, t);
+            }
+        }
+    }
+
+    protected void signalTearingDownExplicitTunnel(
+            PortForwardingEventListener listener, SshdSocketAddress 
boundAddress, boolean localForwarding)
+                    throws IOException {
+        if (listener == null) {
+            return;
+        }
+
+        listener.tearingDownExplicitTunnel(getSession(), boundAddress, 
localForwarding);
+    }
+
+    protected void signalTornDownExplicitTunnel(SshdSocketAddress 
boundAddress, boolean localForwarding, Throwable reason) throws IOException {
+        try {
+            invokePortEventListenerSignaller(l -> {
+                signalTornDownExplicitTunnel(l, boundAddress, localForwarding, 
reason);
+                return null;
+            });
+        } catch (Throwable t) {
+            if (t instanceof RuntimeException) {
+                throw (RuntimeException) t;
+            } else if (t instanceof Error) {
+                throw (Error) t;
+            } else if (t instanceof IOException) {
+                throw (IOException) t;
+            } else {
+                throw new IOException("Failed (" + 
t.getClass().getSimpleName() + ")"
+                        + " to signal torn down explicit tunnel local=" + 
localForwarding
+                        + " on bound=" + boundAddress, t);
+            }
+        }
+    }
+
+    protected void signalTornDownExplicitTunnel(
+            PortForwardingEventListener listener, SshdSocketAddress 
boundAddress, boolean localForwarding, Throwable reason)
+                    throws IOException {
+        if (listener == null) {
+            return;
+        }
+
+        listener.tornDownExplicitTunnel(getSession(), boundAddress, 
localForwarding, reason);
+    }
+
     @Override
     public synchronized SshdSocketAddress 
startDynamicPortForwarding(SshdSocketAddress local) throws IOException {
         Objects.requireNonNull(local, "Local address is null");
@@ -328,8 +427,7 @@ public class DefaultTcpipForwarder
         SocksProxy prev;
         InetSocketAddress bound;
         int port;
-        PortForwardingEventListener listener = 
getPortForwardingEventListenerProxy();
-        listener.establishingDynamicTunnel(getSession(), local);
+        signalEstablishingDynamicTunnel(local);
         try {
             bound = doBind(local, socksProxyIoHandlerFactory);
             port = bound.getPort();
@@ -346,7 +444,7 @@ public class DefaultTcpipForwarder
             } catch (IOException | RuntimeException err) {
                 e.addSuppressed(err);
             }
-            listener.establishedDynamicTunnel(getSession(), local, null, e);
+            signalEstablishedDynamicTunnel(local, null, e);
             throw e;
         }
 
@@ -356,7 +454,7 @@ public class DefaultTcpipForwarder
                 log.debug("startDynamicPortForwarding(" + local + "): " + 
result);
             }
 
-            listener.establishedDynamicTunnel(getSession(), local, result, 
null);
+            signalEstablishedDynamicTunnel(local, result, null);
             return result;
         } catch (IOException | RuntimeException e) {
             stopDynamicPortForwarding(local);
@@ -364,6 +462,67 @@ public class DefaultTcpipForwarder
         }
     }
 
+    protected void signalEstablishedDynamicTunnel(
+            SshdSocketAddress local, SshdSocketAddress boundAddress, Throwable 
reason)
+                    throws IOException {
+        try {
+            invokePortEventListenerSignaller(l -> {
+                signalEstablishedDynamicTunnel(l, local, boundAddress, reason);
+                return null;
+            });
+        } catch (Throwable t) {
+            if (t instanceof RuntimeException) {
+                throw (RuntimeException) t;
+            } else if (t instanceof Error) {
+                throw (Error) t;
+            } else if (t instanceof IOException) {
+                throw (IOException) t;
+            } else {
+                throw new IOException("Failed (" + 
t.getClass().getSimpleName() + ")"
+                        + " to signal establishing dynamic tunnel for local=" 
+ local
+                        + " on bound=" + boundAddress, t);
+            }
+        }
+    }
+
+    protected void signalEstablishedDynamicTunnel(PortForwardingEventListener 
listener,
+                SshdSocketAddress local, SshdSocketAddress boundAddress, 
Throwable reason)
+                    throws IOException {
+        if (listener == null) {
+            return;
+        }
+
+        listener.establishedDynamicTunnel(getSession(), local, boundAddress, 
reason);
+    }
+
+    protected void signalEstablishingDynamicTunnel(SshdSocketAddress local) 
throws IOException {
+        try {
+            invokePortEventListenerSignaller(l -> {
+                signalEstablishingDynamicTunnel(l, local);
+                return null;
+            });
+        } catch (Throwable t) {
+            if (t instanceof RuntimeException) {
+                throw (RuntimeException) t;
+            } else if (t instanceof Error) {
+                throw (Error) t;
+            } else if (t instanceof IOException) {
+                throw (IOException) t;
+            } else {
+                throw new IOException("Failed (" + 
t.getClass().getSimpleName() + ")"
+                        + " to signal establishing dynamic tunnel for local=" 
+ local, t);
+            }
+        }
+    }
+
+    protected void signalEstablishingDynamicTunnel(PortForwardingEventListener 
listener, SshdSocketAddress local) throws IOException {
+        if (listener == null) {
+            return;
+        }
+
+        listener.establishingDynamicTunnel(getSession(), local);
+    }
+
     @Override
     public synchronized void stopDynamicPortForwarding(SshdSocketAddress 
local) throws IOException {
         SocksProxy obj;
@@ -376,17 +535,16 @@ public class DefaultTcpipForwarder
                 log.debug("stopDynamicPortForwarding(" + local + ") 
unbinding");
             }
 
-            PortForwardingEventListener listener = 
getPortForwardingEventListenerProxy();
-            listener.tearingDownDynamicTunnel(sessionInstance, local);
+            signalTearingDownDynamicTunnel(local);
             try {
                 obj.close(true);
                 acceptor.unbind(local.toInetSocketAddress());
             } catch (RuntimeException e) {
-                listener.tornDownDynamicTunnel(getSession(), local, e);
+                signalTornDownDynamicTunnel(local, e);
                 throw e;
             }
 
-            listener.tornDownDynamicTunnel(getSession(), local, null);
+            signalTornDownDynamicTunnel(local, null);
         } else {
             if (log.isDebugEnabled()) {
                 log.debug("stopDynamicPortForwarding(" + local + ") no binding 
found");
@@ -394,6 +552,64 @@ public class DefaultTcpipForwarder
         }
     }
 
+    protected void signalTearingDownDynamicTunnel(SshdSocketAddress address) 
throws IOException {
+        try {
+            invokePortEventListenerSignaller(l -> {
+                signalTearingDownDynamicTunnel(l, address);
+                return null;
+            });
+        } catch (Throwable t) {
+            if (t instanceof RuntimeException) {
+                throw (RuntimeException) t;
+            } else if (t instanceof Error) {
+                throw (Error) t;
+            } else if (t instanceof IOException) {
+                throw (IOException) t;
+            } else {
+                throw new IOException("Failed (" + 
t.getClass().getSimpleName() + ")"
+                        + " to signal tearing down dynamic tunnel for 
address=" + address, t);
+            }
+        }
+    }
+
+    protected void signalTearingDownDynamicTunnel(PortForwardingEventListener 
listener, SshdSocketAddress address) throws IOException {
+        if (listener == null) {
+            return;
+        }
+
+        listener.tearingDownDynamicTunnel(getSession(), address);
+    }
+
+    protected void signalTornDownDynamicTunnel(SshdSocketAddress address, 
Throwable reason) throws IOException {
+        try {
+            invokePortEventListenerSignaller(l -> {
+                signalTornDownDynamicTunnel(l, address, reason);
+                return null;
+            });
+        } catch (Throwable t) {
+            if (t instanceof RuntimeException) {
+                throw (RuntimeException) t;
+            } else if (t instanceof Error) {
+                throw (Error) t;
+            } else if (t instanceof IOException) {
+                throw (IOException) t;
+            } else {
+                throw new IOException("Failed (" + 
t.getClass().getSimpleName() + ")"
+                        + " to signal torn down dynamic tunnel for address=" + 
address, t);
+            }
+        }
+    }
+
+    protected void signalTornDownDynamicTunnel(
+            PortForwardingEventListener listener, SshdSocketAddress address, 
Throwable reason)
+                    throws IOException {
+        if (listener == null) {
+            return;
+        }
+
+        listener.tornDownDynamicTunnel(getSession(), address, reason);
+    }
+
     @Override
     public synchronized SshdSocketAddress getForwardedPort(int remotePort) {
         synchronized (remoteToLocal) {
@@ -425,8 +641,7 @@ public class DefaultTcpipForwarder
             throw new RuntimeSshException(e);
         }
 
-        PortForwardingEventListener listener = 
getPortForwardingEventListenerProxy();
-        listener.establishingExplicitTunnel(getSession(), local, null, true);
+        signalEstablishingExplicitTunnel(local, null, true);
         SshdSocketAddress result;
         try {
             InetSocketAddress bound = doBind(local, staticIoHandlerFactory);
@@ -450,12 +665,12 @@ public class DefaultTcpipForwarder
             } catch (IOException | RuntimeException err) {
                 e.addSuppressed(e);
             }
-            listener.establishedExplicitTunnel(getSession(), local, null, 
true, null, e);
+            signalEstablishedExplicitTunnel(local, null, true, null, e);
             throw e;
         }
 
         try {
-            listener.establishedExplicitTunnel(getSession(), local, null, 
true, result, null);
+            signalEstablishedExplicitTunnel(local, null, true, result, null);
             return result;
         } catch (IOException | RuntimeException e) {
             throw e;
@@ -477,16 +692,15 @@ public class DefaultTcpipForwarder
                 log.debug("localPortForwardingCancelled(" + local + ") unbind 
" + entry);
             }
 
-            PortForwardingEventListener listener = 
getPortForwardingEventListenerProxy();
-            listener.tearingDownExplicitTunnel(getSession(), entry, true);
+            signalTearingDownExplicitTunnel(entry, true);
             try {
                 acceptor.unbind(entry.toInetSocketAddress());
             } catch (RuntimeException e) {
-                listener.tornDownExplicitTunnel(getSession(), entry, true, e);
+                signalTornDownExplicitTunnel(entry, true, e);
                 throw e;
             }
 
-            listener.tornDownExplicitTunnel(getSession(), entry, true, null);
+            signalTornDownExplicitTunnel(entry, true, null);
         } else {
             if (log.isDebugEnabled()) {
                 log.debug("localPortForwardingCancelled(" + local + ") no 
match/acceptor: " + entry);
@@ -494,6 +708,159 @@ public class DefaultTcpipForwarder
         }
     }
 
+    protected void signalEstablishingExplicitTunnel(
+            SshdSocketAddress local, SshdSocketAddress remote, boolean 
localForwarding)
+                    throws IOException {
+        try {
+            invokePortEventListenerSignaller(l -> {
+                signalEstablishingExplicitTunnel(l, local, remote, 
localForwarding);
+                return null;
+            });
+        } catch (Throwable t) {
+            if (t instanceof RuntimeException) {
+                throw (RuntimeException) t;
+            } else if (t instanceof Error) {
+                throw (Error) t;
+            } else if (t instanceof IOException) {
+                throw (IOException) t;
+            } else {
+                throw new IOException("Failed (" + 
t.getClass().getSimpleName() + ")"
+                        + " to signal establishing explicit tunnel for local=" 
+ local
+                        + ", remote=" + remote + ", localForwarding=" + 
localForwarding, t);
+            }
+        }
+    }
+
+    protected void 
signalEstablishingExplicitTunnel(PortForwardingEventListener listener,
+            SshdSocketAddress local, SshdSocketAddress remote, boolean 
localForwarding)
+                    throws IOException {
+        if (listener == null) {
+            return;
+        }
+
+        listener.establishingExplicitTunnel(getSession(), local, remote, 
localForwarding);
+    }
+
+    protected void signalEstablishedExplicitTunnel(
+            SshdSocketAddress local, SshdSocketAddress remote, boolean 
localForwarding,
+            SshdSocketAddress boundAddress, Throwable reason)
+                    throws IOException {
+        try {
+            invokePortEventListenerSignaller(l -> {
+                signalEstablishedExplicitTunnel(l, local, remote, 
localForwarding, boundAddress, reason);
+                return null;
+            });
+        } catch (Throwable t) {
+            if (t instanceof RuntimeException) {
+                throw (RuntimeException) t;
+            } else if (t instanceof Error) {
+                throw (Error) t;
+            } else if (t instanceof IOException) {
+                throw (IOException) t;
+            } else {
+                throw new IOException("Failed (" + 
t.getClass().getSimpleName() + ")"
+                        + " to signal established explicit tunnel for local=" 
+ local
+                        + ", remote=" + remote + ", localForwarding=" + 
localForwarding
+                        + ", bound=" + boundAddress, t);
+            }
+        }
+    }
+
+    protected void signalEstablishedExplicitTunnel(PortForwardingEventListener 
listener,
+            SshdSocketAddress local, SshdSocketAddress remote, boolean 
localForwarding,
+            SshdSocketAddress boundAddress, Throwable reason)
+                    throws IOException {
+        if (listener == null) {
+            return;
+        }
+
+        listener.establishedExplicitTunnel(getSession(), local, remote, 
localForwarding, boundAddress, reason);
+    }
+
+    protected void 
invokePortEventListenerSignaller(Invoker<PortForwardingEventListener, Void> 
invoker) throws Throwable {
+        Throwable err = null;
+        try {
+            invokePortEventListenerSignallerListeners(getDefaultListeners(), 
invoker);
+        } catch (Throwable t) {
+            Throwable e = GenericUtils.peelException(t);
+            err = GenericUtils.accumulateException(err, e);
+        }
+
+        try {
+            invokePortEventListenerSignallerHolders(managersHolder, invoker);
+        } catch (Throwable t) {
+            Throwable e = GenericUtils.peelException(t);
+            err = GenericUtils.accumulateException(err, e);
+        }
+
+
+        if (err != null) {
+            throw err;
+        }
+    }
+
+    protected void invokePortEventListenerSignallerListeners(
+            Collection<? extends PortForwardingEventListener> listeners, 
Invoker<PortForwardingEventListener, Void> invoker)
+                    throws Throwable {
+        if (GenericUtils.isEmpty(listeners)) {
+            return;
+        }
+
+        Throwable err = null;
+        // Need to go over the hierarchy (session, factory managed, connection 
service, etc...)
+        for (PortForwardingEventListener l : listeners) {
+            if (l == null) {
+                continue;
+            }
+
+            try {
+                invoker.invoke(l);
+            } catch (Throwable t) {
+                Throwable e = GenericUtils.peelException(t);
+                err = GenericUtils.accumulateException(err, e);
+            }
+        }
+
+        if (err != null) {
+            throw err;
+        }
+    }
+
+    protected void invokePortEventListenerSignallerHolders(
+            Collection<? extends PortForwardingEventListenerManager> holders, 
Invoker<PortForwardingEventListener, Void> invoker)
+                    throws Throwable {
+        if (GenericUtils.isEmpty(holders)) {
+            return;
+        }
+
+        Throwable err = null;
+        // Need to go over the hierarchy (session, factory managed, connection 
service, etc...)
+        for (PortForwardingEventListenerManager m : holders) {
+            try {
+                PortForwardingEventListener listener = 
m.getPortForwardingEventListenerProxy();
+                if (listener != null) {
+                    invoker.invoke(listener);
+                }
+            } catch (Throwable t) {
+                Throwable e = GenericUtils.peelException(t);
+                err = GenericUtils.accumulateException(err, e);
+            }
+
+            if (m instanceof PortForwardingEventListenerManagerHolder) {
+                try {
+                    
invokePortEventListenerSignallerHolders(((PortForwardingEventListenerManagerHolder)
 m).getRegisteredManagers(), invoker);
+                } catch (Throwable t) {
+                    Throwable e = GenericUtils.peelException(t);
+                    err = GenericUtils.accumulateException(err, e);
+                }
+            }
+        }
+
+        if (err != null) {
+            throw err;
+        }
+    }
+
     @Override
     protected synchronized Closeable getInnerCloseable() {
         return 
builder().parallel(dynamicLocal.values()).close(acceptor).build();
@@ -502,6 +869,7 @@ public class DefaultTcpipForwarder
     @Override
     protected void preClose() {
         this.listeners.clear();
+        this.managersHolder.clear();
         super.preClose();
     }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarderFactory.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarderFactory.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarderFactory.java
index b100524..808f41d 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarderFactory.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarderFactory.java
@@ -19,7 +19,7 @@
 package org.apache.sshd.common.forward;
 
 import java.util.Collection;
-import java.util.Objects;
+import java.util.concurrent.CopyOnWriteArraySet;
 
 import org.apache.sshd.common.session.ConnectionService;
 import org.apache.sshd.common.util.EventListenerUtils;
@@ -40,10 +40,14 @@ public class DefaultTcpipForwarderFactory implements 
TcpipForwarderFactory, Port
         public void 
removePortForwardingEventListener(PortForwardingEventListener listener) {
             throw new 
UnsupportedOperationException("removePortForwardingEventListener(" + listener + 
") N/A on default instance");
         }
+
+        @Override
+        public PortForwardingEventListener 
getPortForwardingEventListenerProxy() {
+            return PortForwardingEventListener.EMPTY;
+        }
     };
 
-    private final Collection<PortForwardingEventListener> listeners =
-            EventListenerUtils.synchronizedListenersSet();
+    private final Collection<PortForwardingEventListener> listeners = new 
CopyOnWriteArraySet<>();
     private final PortForwardingEventListener listenerProxy;
 
     public DefaultTcpipForwarderFactory() {
@@ -57,7 +61,7 @@ public class DefaultTcpipForwarderFactory implements 
TcpipForwarderFactory, Port
 
     @Override
     public void addPortForwardingEventListener(PortForwardingEventListener 
listener) {
-        listeners.add(Objects.requireNonNull(listener, "No listener to add"));
+        listeners.add(PortForwardingEventListener.validateListener(listener));
     }
 
     @Override
@@ -66,13 +70,13 @@ public class DefaultTcpipForwarderFactory implements 
TcpipForwarderFactory, Port
             return;
         }
 
-        listeners.remove(listener);
+        
listeners.remove(PortForwardingEventListener.validateListener(listener));
     }
 
     @Override
     public TcpipForwarder create(ConnectionService service) {
         TcpipForwarder forwarder = new DefaultTcpipForwarder(service);
-        
forwarder.addPortForwardingEventListener(getPortForwardingEventListenerProxy());
+        forwarder.addPortForwardingEventListenerManager(this);
         return forwarder;
     }
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/common/forward/PortForwardingEventListener.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/forward/PortForwardingEventListener.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/forward/PortForwardingEventListener.java
index 857fbfe..fde2d6d 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/forward/PortForwardingEventListener.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/forward/PortForwardingEventListener.java
@@ -154,4 +154,8 @@ public interface PortForwardingEventListener extends 
SshdEventListener {
     default void tornDownDynamicTunnel(Session session, SshdSocketAddress 
address, Throwable reason) throws IOException {
         // ignored
     }
+
+    static <L extends PortForwardingEventListener> L validateListener(L 
listener) {
+        return SshdEventListener.validateListener(listener, 
PortForwardingEventListener.class.getSimpleName());
+    }
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/common/forward/PortForwardingEventListenerManagerHolder.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/forward/PortForwardingEventListenerManagerHolder.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/forward/PortForwardingEventListenerManagerHolder.java
new file mode 100644
index 0000000..71a607b
--- /dev/null
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/forward/PortForwardingEventListenerManagerHolder.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.common.forward;
+
+import java.util.Collection;
+
+/**
+ * @author <a href="mailto:d...@mina.apache.org";>Apache MINA SSHD Project</a>
+ */
+public interface PortForwardingEventListenerManagerHolder {
+    /**
+     * @return The currently registered managers. <B>Note:</B> it is highly
+     * recommended that implementors return either an un-modifiable collection
+     * or a <U>copy</U> of the current one. Callers, should avoid modifying
+     * the retrieved value.
+     */
+    Collection<PortForwardingEventListenerManager> getRegisteredManagers();
+
+    boolean 
addPortForwardingEventListenerManager(PortForwardingEventListenerManager 
manager);
+    boolean 
removePortForwardingEventListenerManager(PortForwardingEventListenerManager 
manager);
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarder.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarder.java 
b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarder.java
index 07f4405..5e3d9ce 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarder.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarder.java
@@ -24,7 +24,11 @@ import java.io.IOException;
 import org.apache.sshd.common.Closeable;
 import org.apache.sshd.common.util.net.SshdSocketAddress;
 
-public interface TcpipForwarder extends PortForwardingManager, 
PortForwardingEventListenerManager, Closeable {
+public interface TcpipForwarder
+        extends PortForwardingManager,
+                PortForwardingEventListenerManager,
+                PortForwardingEventListenerManagerHolder,
+                Closeable {
     /**
      * @param remotePort The remote port
      * @return The local {@link SshdSocketAddress} that the remote port is 
forwarded to

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/common/helpers/AbstractFactoryManager.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/helpers/AbstractFactoryManager.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/helpers/AbstractFactoryManager.java
index 0b4802d..03adb22 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/helpers/AbstractFactoryManager.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/helpers/AbstractFactoryManager.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -76,14 +77,11 @@ public abstract class AbstractFactoryManager extends 
AbstractKexFactoryManager i
     protected List<RequestHandler<ConnectionService>> globalRequestHandlers;
     protected SessionTimeoutListener sessionTimeoutListener;
     protected ScheduledFuture<?> timeoutListenerFuture;
-    protected final Collection<SessionListener> sessionListeners =
-            EventListenerUtils.synchronizedListenersSet();
+    protected final Collection<SessionListener> sessionListeners = new 
CopyOnWriteArraySet<>();
     protected final SessionListener sessionListenerProxy;
-    protected final Collection<ChannelListener> channelListeners =
-            EventListenerUtils.synchronizedListenersSet();
+    protected final Collection<ChannelListener> channelListeners = new 
CopyOnWriteArraySet<>();
     protected final ChannelListener channelListenerProxy;
-    protected final Collection<PortForwardingEventListener> tunnelListeners =
-            EventListenerUtils.synchronizedListenersSet();
+    protected final Collection<PortForwardingEventListener> tunnelListeners = 
new CopyOnWriteArraySet<>();
     protected final PortForwardingEventListener tunnelListenerProxy;
 
     private final Map<String, Object> properties = new ConcurrentHashMap<>();
@@ -275,7 +273,8 @@ public abstract class AbstractFactoryManager extends 
AbstractKexFactoryManager i
 
     @Override
     public void addSessionListener(SessionListener listener) {
-        ValidateUtils.checkNotNull(listener, "addSessionListener(%s) null 
instance", this);
+        SessionListener.validateListener(listener);
+
         // avoid race conditions on notifications while manager is being closed
         if (!isOpen()) {
             log.warn("addSessionListener({})[{}] ignore registration while 
manager is closing", this, listener);
@@ -295,6 +294,12 @@ public abstract class AbstractFactoryManager extends 
AbstractKexFactoryManager i
 
     @Override
     public void removeSessionListener(SessionListener listener) {
+        if (listener == null) {
+            return;
+        }
+
+        SessionListener.validateListener(listener);
+
         if (this.sessionListeners.remove(listener)) {
             if (log.isTraceEnabled()) {
                 log.trace("removeSessionListener({})[{}] removed", this, 
listener);
@@ -313,7 +318,8 @@ public abstract class AbstractFactoryManager extends 
AbstractKexFactoryManager i
 
     @Override
     public void addChannelListener(ChannelListener listener) {
-        ValidateUtils.checkNotNull(listener, "addChannelListener(%s) null 
instance", this);
+        ChannelListener.validateListener(listener);
+
         // avoid race conditions on notifications while manager is being closed
         if (!isOpen()) {
             log.warn("addChannelListener({})[{}] ignore registration while 
session is closing", this, listener);
@@ -333,6 +339,11 @@ public abstract class AbstractFactoryManager extends 
AbstractKexFactoryManager i
 
     @Override
     public void removeChannelListener(ChannelListener listener) {
+        if (listener == null) {
+            return;
+        }
+
+        ChannelListener.validateListener(listener);
         if (this.channelListeners.remove(listener)) {
             if (log.isTraceEnabled()) {
                 log.trace("removeChannelListener({})[{}] removed", this, 
listener);
@@ -356,7 +367,8 @@ public abstract class AbstractFactoryManager extends 
AbstractKexFactoryManager i
 
     @Override
     public void addPortForwardingEventListener(PortForwardingEventListener 
listener) {
-        ValidateUtils.checkNotNull(listener, "addChannelListener(%s) null 
instance", this);
+        PortForwardingEventListener.validateListener(listener);
+
         // avoid race conditions on notifications while session is being closed
         if (!isOpen()) {
             log.warn("addPortForwardingEventListener({})[{}] ignore 
registration while session is closing", this, listener);
@@ -380,6 +392,7 @@ public abstract class AbstractFactoryManager extends 
AbstractKexFactoryManager i
             return;
         }
 
+        PortForwardingEventListener.validateListener(listener);
         if (this.tunnelListeners.remove(listener)) {
             if (log.isTraceEnabled()) {
                 log.trace("removePortForwardingEventListener({})[{}] removed", 
this, listener);

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/common/scp/ScpTransferEventListener.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/scp/ScpTransferEventListener.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/scp/ScpTransferEventListener.java
index 3acae88..d7954e0 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/scp/ScpTransferEventListener.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/scp/ScpTransferEventListener.java
@@ -98,4 +98,8 @@ public interface ScpTransferEventListener extends 
SshdEventListener {
             throws IOException {
         // ignored
     }
+
+    static <L extends ScpTransferEventListener> L validateListener(L listener) 
{
+        return SshdEventListener.validateListener(listener, 
ScpTransferEventListener.class.getSimpleName());
+    }
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractConnectionServiceFactory.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractConnectionServiceFactory.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractConnectionServiceFactory.java
index 779e215..1d80778 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractConnectionServiceFactory.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractConnectionServiceFactory.java
@@ -21,6 +21,7 @@ package org.apache.sshd.common.session;
 
 import java.util.Collection;
 import java.util.Objects;
+import java.util.concurrent.CopyOnWriteArraySet;
 
 import org.apache.sshd.common.forward.PortForwardingEventListener;
 import org.apache.sshd.common.forward.PortForwardingEventListenerManager;
@@ -31,8 +32,7 @@ import 
org.apache.sshd.common.util.logging.AbstractLoggingBean;
  * @author <a href="mailto:d...@mina.apache.org";>Apache MINA SSHD Project</a>
  */
 public abstract class AbstractConnectionServiceFactory extends 
AbstractLoggingBean implements PortForwardingEventListenerManager {
-    private final Collection<PortForwardingEventListener> listeners =
-            EventListenerUtils.synchronizedListenersSet();
+    private final Collection<PortForwardingEventListener> listeners = new 
CopyOnWriteArraySet<>();
     private final PortForwardingEventListener listenerProxy;
 
     protected AbstractConnectionServiceFactory() {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/common/session/ConnectionService.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/session/ConnectionService.java 
b/sshd-core/src/main/java/org/apache/sshd/common/session/ConnectionService.java
index a07046e..9795e5c 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/session/ConnectionService.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/session/ConnectionService.java
@@ -24,6 +24,7 @@ import org.apache.sshd.agent.common.AgentForwardSupport;
 import org.apache.sshd.common.Service;
 import org.apache.sshd.common.channel.Channel;
 import org.apache.sshd.common.forward.PortForwardingEventListenerManager;
+import org.apache.sshd.common.forward.PortForwardingEventListenerManagerHolder;
 import org.apache.sshd.common.forward.TcpipForwarder;
 import org.apache.sshd.server.x11.X11ForwardSupport;
 
@@ -32,7 +33,7 @@ import org.apache.sshd.server.x11.X11ForwardSupport;
  *
  * @author <a href="mailto:d...@mina.apache.org";>Apache MINA SSHD Project</a>
  */
-public interface ConnectionService extends Service, 
PortForwardingEventListenerManager {
+public interface ConnectionService extends Service, 
PortForwardingEventListenerManager, PortForwardingEventListenerManagerHolder {
     /**
      * Register a newly created channel with a new unique identifier
      *

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/common/session/SessionListener.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/session/SessionListener.java 
b/sshd-core/src/main/java/org/apache/sshd/common/session/SessionListener.java
index 8466ee7..f3280dd 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/session/SessionListener.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/session/SessionListener.java
@@ -101,4 +101,8 @@ public interface SessionListener extends SshdEventListener {
     default void sessionClosed(Session session) {
         // ignored
     }
+
+    static <L extends SessionListener> L validateListener(L listener) {
+        return SshdEventListener.validateListener(listener, 
SessionListener.class.getSimpleName());
+    }
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
index 368045e..60f98e8 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
@@ -19,11 +19,14 @@
 package org.apache.sshd.common.session.helpers;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -38,12 +41,13 @@ import org.apache.sshd.common.NamedFactory;
 import org.apache.sshd.common.PropertyResolverUtils;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.SshException;
+import org.apache.sshd.common.channel.AbstractChannel;
 import org.apache.sshd.common.channel.Channel;
-import org.apache.sshd.common.channel.ChannelListener;
 import org.apache.sshd.common.channel.OpenChannelException;
 import org.apache.sshd.common.channel.RequestHandler;
 import org.apache.sshd.common.channel.Window;
 import org.apache.sshd.common.forward.PortForwardingEventListener;
+import org.apache.sshd.common.forward.PortForwardingEventListenerManager;
 import org.apache.sshd.common.forward.TcpipForwarder;
 import org.apache.sshd.common.forward.TcpipForwarderFactory;
 import org.apache.sshd.common.io.AbstractIoWriteFuture;
@@ -98,10 +102,9 @@ public abstract class AbstractConnectionService<S extends 
AbstractSession>
     private final AtomicReference<X11ForwardSupport> x11ForwardHolder = new 
AtomicReference<>();
     private final AtomicReference<TcpipForwarder> tcpipForwarderHolder = new 
AtomicReference<>();
     private final AtomicBoolean allowMoreSessions = new AtomicBoolean(true);
-    private final Collection<PortForwardingEventListener> listeners =
-            EventListenerUtils.synchronizedListenersSet();
+    private final Collection<PortForwardingEventListener> listeners = new 
CopyOnWriteArraySet<>();
+    private final Collection<PortForwardingEventListenerManager> 
managersHolder = new CopyOnWriteArraySet<>();
     private final PortForwardingEventListener listenerProxy;
-
     private final S sessionInstance;
 
     protected AbstractConnectionService(S session) {
@@ -116,7 +119,7 @@ public abstract class AbstractConnectionService<S extends 
AbstractSession>
 
     @Override
     public void addPortForwardingEventListener(PortForwardingEventListener 
listener) {
-        listeners.add(Objects.requireNonNull(listener, "No listener to add"));
+        listeners.add(PortForwardingEventListener.validateListener(listener));
     }
 
     @Override
@@ -125,7 +128,26 @@ public abstract class AbstractConnectionService<S extends 
AbstractSession>
             return;
         }
 
-        listeners.remove(listener);
+        
listeners.remove(PortForwardingEventListener.validateListener(listener));
+    }
+
+    @Override
+    public Collection<PortForwardingEventListenerManager> 
getRegisteredManagers() {
+        return managersHolder.isEmpty() ? Collections.emptyList() : new 
ArrayList<>(managersHolder);
+    }
+
+    @Override
+    public boolean 
addPortForwardingEventListenerManager(PortForwardingEventListenerManager 
manager) {
+        return managersHolder.add(Objects.requireNonNull(manager, "No 
manager"));
+    }
+
+    @Override
+    public boolean 
removePortForwardingEventListenerManager(PortForwardingEventListenerManager 
manager) {
+        if (manager == null) {
+            return false;
+        }
+
+        return managersHolder.remove(manager);
     }
 
     public Collection<Channel> getChannels() {
@@ -165,6 +187,7 @@ public abstract class AbstractConnectionService<S extends 
AbstractSession>
     @Override
     protected void preClose() {
         this.listeners.clear();
+        this.managersHolder.clear();
         super.preClose();
     }
 
@@ -174,8 +197,7 @@ public abstract class AbstractConnectionService<S extends 
AbstractSession>
         TcpipForwarderFactory factory =
                 Objects.requireNonNull(manager.getTcpipForwarderFactory(), "No 
forwarder factory");
         TcpipForwarder forwarder = factory.create(this);
-        
forwarder.addPortForwardingEventListener(getPortForwardingEventListenerProxy());
-        
forwarder.addPortForwardingEventListener(session.getPortForwardingEventListenerProxy());
+        forwarder.addPortForwardingEventListenerManager(this);
         return forwarder;
     }
 
@@ -272,26 +294,9 @@ public abstract class AbstractConnectionService<S extends 
AbstractSession>
 
     protected void handleChannelRegistrationFailure(Channel channel, int 
channelId) throws IOException {
         RuntimeException reason = new IllegalStateException("Channel id=" + 
channelId + " not registered because session is being closed: " + this);
-        ChannelListener listener = channel.getChannelListenerProxy();
-        try {
-            listener.channelClosed(channel, reason);
-        } catch (Throwable err) {
-            Throwable ignored = GenericUtils.peelException(err);
-            log.warn("registerChannel({})[{}] failed ({}) to inform of channel 
closure: {}",
-                     this, channel, ignored.getClass().getSimpleName(), 
ignored.getMessage());
-            if (log.isDebugEnabled()) {
-                log.debug("registerChannel(" + this + ")[" + channel + "] 
inform closure failure details", ignored);
-            }
-            if (log.isTraceEnabled()) {
-                Throwable[] suppressed = ignored.getSuppressed();
-                if (GenericUtils.length(suppressed) > 0) {
-                    for (Throwable s : suppressed) {
-                        log.trace("registerChannel(" + this + ")[" + channel + 
"] suppressed channel closed signalling", s);
-                    }
-                }
-            }
-        }
-
+        AbstractChannel notifier =
+                ValidateUtils.checkInstanceOf(channel, AbstractChannel.class, 
"Non abstract channel for id=%d", channelId);
+        notifier.signalChannelClosed(reason);
         throw reason;
     }
 

Reply via email to