Repository: mina-sshd
Updated Branches:
  refs/heads/master 6c92dcce8 -> 38c84a830


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
index 97b5e4d..dc09e33 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
@@ -34,6 +34,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -78,6 +79,7 @@ import org.apache.sshd.common.session.SessionListener;
 import org.apache.sshd.common.session.SessionWorkBuffer;
 import org.apache.sshd.common.util.EventListenerUtils;
 import org.apache.sshd.common.util.GenericUtils;
+import org.apache.sshd.common.util.Invoker;
 import org.apache.sshd.common.util.NumberUtils;
 import org.apache.sshd.common.util.Pair;
 import org.apache.sshd.common.util.Readable;
@@ -133,22 +135,19 @@ public abstract class AbstractSession extends 
AbstractKexFactoryManager implemen
     /**
      * Session listeners container
      */
-    protected final Collection<SessionListener> sessionListeners =
-            EventListenerUtils.synchronizedListenersSet();
+    protected final Collection<SessionListener> sessionListeners = new 
CopyOnWriteArraySet<>();
     protected final SessionListener sessionListenerProxy;
 
     /**
      * Channel events listener container
      */
-    protected final Collection<ChannelListener> channelListeners =
-            EventListenerUtils.synchronizedListenersSet();
+    protected final Collection<ChannelListener> channelListeners = new 
CopyOnWriteArraySet<>();
     protected final ChannelListener channelListenerProxy;
 
     /**
      * Port forwarding events listener container
      */
-    protected final Collection<PortForwardingEventListener> tunnelListeners =
-            EventListenerUtils.synchronizedListenersSet();
+    protected final Collection<PortForwardingEventListener> tunnelListeners = 
new CopyOnWriteArraySet<>();
     protected final PortForwardingEventListener tunnelListenerProxy;
 
     /*
@@ -265,11 +264,36 @@ public abstract class AbstractSession extends 
AbstractKexFactoryManager implemen
         sessionListenerProxy = 
EventListenerUtils.proxyWrapper(SessionListener.class, loader, 
sessionListeners);
         channelListenerProxy = 
EventListenerUtils.proxyWrapper(ChannelListener.class, loader, 
channelListeners);
         tunnelListenerProxy = 
EventListenerUtils.proxyWrapper(PortForwardingEventListener.class, loader, 
tunnelListeners);
+    }
+
+    protected void signalSessionCreated(IoSession ioSession) throws Exception {
+        try {
+            invokeSessionSignaller(l -> {
+                signalSessionCreated(l);
+                return null;
+            });
+        } catch (Throwable err) {
+            Throwable e = GenericUtils.peelException(err);
+            if (log.isDebugEnabled()) {
+                log.debug("Failed ({}) to announce session={} created: {}",
+                          e.getClass().getSimpleName(), ioSession, 
e.getMessage());
+            }
+            if (log.isTraceEnabled()) {
+                log.trace("Session=" + ioSession + " creation failure 
details", e);
+            }
+            if (e instanceof Exception) {
+                throw (Exception) e;
+            } else {
+                throw new RuntimeSshException(e);
+            }
+        }
+    }
 
-        // Delegate the task of further notifications to the session
-        addSessionListener(factoryManager.getSessionListenerProxy());
-        addChannelListener(factoryManager.getChannelListenerProxy());
-        
addPortForwardingEventListener(factoryManager.getPortForwardingEventListenerProxy());
+    protected void signalSessionCreated(SessionListener listener) {
+        if (listener == null) {
+            return;
+        }
+        listener.sessionCreated(this);
     }
 
     /**
@@ -404,7 +428,7 @@ public abstract class AbstractSession extends 
AbstractKexFactoryManager implemen
     @Override
     public void setAuthenticated() throws IOException {
         this.authed = true;
-        sendSessionEvent(SessionListener.Event.Authenticated);
+        signalSessionEvent(SessionListener.Event.Authenticated);
     }
 
     /**
@@ -736,7 +760,7 @@ public abstract class AbstractSession extends 
AbstractKexFactoryManager implemen
                 "Unknown negotiated KEX algorithm: %s",
                 kexAlgorithm);
         kex.init(this, serverVersion.getBytes(StandardCharsets.UTF_8), 
clientVersion.getBytes(StandardCharsets.UTF_8), i_s, i_c);
-        sendSessionEvent(SessionListener.Event.KexCompleted);
+        signalSessionEvent(SessionListener.Event.KexCompleted);
     }
 
     protected void handleNewKeys(int cmd, Buffer buffer) throws Exception {
@@ -756,7 +780,7 @@ public abstract class AbstractSession extends 
AbstractKexFactoryManager implemen
             }
         }
 
-        sendSessionEvent(SessionListener.Event.KeyEstablished);
+        signalSessionEvent(SessionListener.Event.KeyEstablished);
         synchronized (pendingPackets) {
             if (!pendingPackets.isEmpty()) {
                 if (log.isDebugEnabled()) {
@@ -812,24 +836,7 @@ public abstract class AbstractSession extends 
AbstractKexFactoryManager implemen
             log.debug("exceptionCaught(" + this + ")[state=" + curState + "] 
details", t);
         }
 
-        SessionListener listener = getSessionListenerProxy();
-        try {
-            listener.sessionException(this, t);
-        } catch (Throwable err) {
-            Throwable e = GenericUtils.peelException(err);
-            if (log.isDebugEnabled()) {
-                log.debug("exceptionCaught(" + this + ") signal session 
exception details", e);
-            }
-
-            if (log.isTraceEnabled()) {
-                Throwable[] suppressed = e.getSuppressed();
-                if (GenericUtils.length(suppressed) > 0) {
-                    for (Throwable s : suppressed) {
-                        log.trace("exceptionCaught(" + this + ") suppressed 
session exception signalling", s);
-                    }
-                }
-            }
-        }
+        signalExceptionCaught(t);
 
         if (State.Opened.equals(curState) && (t instanceof SshException)) {
             int code = ((SshException) t).getDisconnectCode();
@@ -852,6 +859,37 @@ public abstract class AbstractSession extends 
AbstractKexFactoryManager implemen
         close(true);
     }
 
+    protected void signalExceptionCaught(Throwable t) {
+        try {
+            invokeSessionSignaller(l -> {
+                signalExceptionCaught(l, t);
+                return null;
+            });
+        } catch (Throwable err) {
+            Throwable e = GenericUtils.peelException(err);
+            if (log.isDebugEnabled()) {
+                log.debug("exceptionCaught(" + this + ") signal session 
exception details", e);
+            }
+
+            if (log.isTraceEnabled()) {
+                Throwable[] suppressed = e.getSuppressed();
+                if (GenericUtils.length(suppressed) > 0) {
+                    for (Throwable s : suppressed) {
+                        log.trace("exceptionCaught(" + this + ") suppressed 
session exception signalling", s);
+                    }
+                }
+            }
+        }
+    }
+
+    protected void signalExceptionCaught(SessionListener listener, Throwable 
t) {
+        if (listener == null) {
+            return;
+        }
+
+        listener.sessionException(this, t);
+    }
+
     @Override
     protected Closeable getInnerCloseable() {
         return builder()
@@ -880,32 +918,48 @@ public abstract class AbstractSession extends 
AbstractKexFactoryManager implemen
         }
 
         // Fire 'close' event
-        SessionListener listener = getSessionListenerProxy();
         try {
-            listener.sessionClosed(this);
-        } catch (Throwable t) {
-            Throwable e = GenericUtils.peelException(t);
-            log.warn("preClose({}) {} while signal session closed: {}", this, 
e.getClass().getSimpleName(), e.getMessage());
+            signalSessionClosed();
+        } finally {
+            // clear the listeners since we are closing the session (quicker 
GC)
+            this.sessionListeners.clear();
+            this.channelListeners.clear();
+            this.tunnelListeners.clear();
+        }
+
+        super.preClose();
+    }
+
+    protected void signalSessionClosed() {
+        try {
+            invokeSessionSignaller(l -> {
+                signalSessionClosed(l);
+                return null;
+            });
+        } catch (Throwable err) {
+            Throwable e = GenericUtils.peelException(err);
+            log.warn("signalSessionClosed({}) {} while signal session closed: 
{}", this, e.getClass().getSimpleName(), e.getMessage());
             if (log.isDebugEnabled()) {
-                log.debug("preClose(" + this + ") signal session closed 
exception details", e);
+                log.debug("signalSessionClosed(" + this + ") signal session 
closed exception details", e);
             }
 
             if (log.isTraceEnabled()) {
                 Throwable[] suppressed = e.getSuppressed();
                 if (GenericUtils.length(suppressed) > 0) {
                     for (Throwable s : suppressed) {
-                        log.trace("preClose(" + this + ") suppressed session 
closed signalling", s);
+                        log.trace("signalSessionClosed(" + this + ") 
suppressed session closed signalling", s);
                     }
                 }
             }
-        } finally {
-            // clear the listeners since we are closing the session (quicker 
GC)
-            this.sessionListeners.clear();
-            this.channelListeners.clear();
-            this.tunnelListeners.clear();
         }
+    }
 
-        super.preClose();
+    protected void signalSessionClosed(SessionListener listener) {
+        if (listener == null) {
+            return;
+        }
+
+        listener.sessionClosed(this);
     }
 
     protected List<Service> getServices() {
@@ -1866,10 +1920,9 @@ public abstract class AbstractSession extends 
AbstractKexFactoryManager implemen
      * @return The negotiated options {@link Map}
      */
     protected Map<KexProposalOption, String> negotiate() {
-        SessionListener listener = getSessionListenerProxy();
         Map<KexProposalOption, String> c2sOptions = 
Collections.unmodifiableMap(clientProposal);
         Map<KexProposalOption, String> s2cOptions = 
Collections.unmodifiableMap(serverProposal);
-        listener.sessionNegotiationStart(this, c2sOptions, s2cOptions);
+        signalNegotiationStart(c2sOptions, s2cOptions);
 
         Map<KexProposalOption, String> guess = new 
EnumMap<>(KexProposalOption.class);
         Map<KexProposalOption, String> negotiatedGuess = 
Collections.unmodifiableMap(guess);
@@ -1914,14 +1967,69 @@ public abstract class AbstractSession extends 
AbstractKexFactoryManager implemen
                 }
             }
         } catch (RuntimeException | Error e) {
-            listener.sessionNegotiationEnd(this, c2sOptions, s2cOptions, 
negotiatedGuess, e);
+            signalNegotiationEnd(c2sOptions, s2cOptions, negotiatedGuess, e);
             throw e;
         }
 
-        listener.sessionNegotiationEnd(this, c2sOptions, s2cOptions, 
negotiatedGuess, null);
+        signalNegotiationEnd(c2sOptions, s2cOptions, negotiatedGuess, null);
         return setNegotiationResult(guess);
     }
 
+    protected void signalNegotiationStart(Map<KexProposalOption, String> 
c2sOptions, Map<KexProposalOption, String> s2cOptions) {
+        try {
+            invokeSessionSignaller(l -> {
+                signalNegotiationStart(l, c2sOptions, s2cOptions);
+                return null;
+            });
+        } catch (Throwable err) {
+            if (err instanceof RuntimeException) {
+                throw (RuntimeException) err;
+            } else if (err instanceof Error) {
+                throw (Error) err;
+            } else {
+                throw new RuntimeException(err);
+            }
+        }
+    }
+
+    protected void signalNegotiationStart(
+            SessionListener listener, Map<KexProposalOption, String> 
c2sOptions, Map<KexProposalOption, String> s2cOptions) {
+        if (listener == null) {
+            return;
+        }
+
+        listener.sessionNegotiationStart(this, c2sOptions, s2cOptions);
+    }
+
+    protected void signalNegotiationEnd(
+            Map<KexProposalOption, String> c2sOptions, Map<KexProposalOption, 
String> s2cOptions,
+            Map<KexProposalOption, String> negotiatedGuess, Throwable reason) {
+        try {
+            invokeSessionSignaller(l -> {
+                signalNegotiationEnd(l, c2sOptions, s2cOptions, 
negotiatedGuess, reason);
+                return null;
+            });
+        } catch (Throwable err) {
+            if (err instanceof RuntimeException) {
+                throw (RuntimeException) err;
+            } else if (err instanceof Error) {
+                throw (Error) err;
+            } else {
+                throw new RuntimeException(err);
+            }
+        }
+    }
+
+    protected void signalNegotiationEnd(SessionListener listener,
+            Map<KexProposalOption, String> c2sOptions, Map<KexProposalOption, 
String> s2cOptions,
+            Map<KexProposalOption, String> negotiatedGuess, Throwable reason) {
+        if (listener == null) {
+            return;
+        }
+
+        listener.sessionNegotiationEnd(this, c2sOptions, s2cOptions, 
negotiatedGuess, null);
+    }
+
     protected Map<KexProposalOption, String> 
setNegotiationResult(Map<KexProposalOption, String> guess) {
         synchronized (negotiationResult) {
             if (!negotiationResult.isEmpty()) {
@@ -2026,7 +2134,7 @@ public abstract class AbstractSession extends 
AbstractKexFactoryManager implemen
 
     @Override
     public void addSessionListener(SessionListener listener) {
-        ValidateUtils.checkNotNull(listener, "addSessionListener(%s) null 
instance", this);
+        SessionListener.validateListener(listener);
         // avoid race conditions on notifications while session is being closed
         if (!isOpen()) {
             log.warn("addSessionListener({})[{}] ignore registration while 
session is closing", this, listener);
@@ -2046,6 +2154,11 @@ public abstract class AbstractSession extends 
AbstractKexFactoryManager implemen
 
     @Override
     public void removeSessionListener(SessionListener listener) {
+        if (listener == null) {
+            return;
+        }
+
+        SessionListener.validateListener(listener);
         if (this.sessionListeners.remove(listener)) {
             if (log.isTraceEnabled()) {
                 log.trace("removeSessionListener({})[{}] removed", this, 
listener);
@@ -2064,7 +2177,7 @@ public abstract class AbstractSession extends 
AbstractKexFactoryManager implemen
 
     @Override
     public void addChannelListener(ChannelListener listener) {
-        ValidateUtils.checkNotNull(listener, "addChannelListener(%s) null 
instance", this);
+        ChannelListener.validateListener(listener);
         // avoid race conditions on notifications while session is being closed
         if (!isOpen()) {
             log.warn("addChannelListener({})[{}] ignore registration while 
session is closing", this, listener);
@@ -2084,6 +2197,11 @@ public abstract class AbstractSession extends 
AbstractKexFactoryManager implemen
 
     @Override
     public void removeChannelListener(ChannelListener listener) {
+        if (listener == null) {
+            return;
+        }
+
+        ChannelListener.validateListener(listener);
         if (this.channelListeners.remove(listener)) {
             if (log.isTraceEnabled()) {
                 log.trace("removeChannelListener({})[{}] removed", this, 
listener);
@@ -2107,7 +2225,7 @@ public abstract class AbstractSession extends 
AbstractKexFactoryManager implemen
 
     @Override
     public void addPortForwardingEventListener(PortForwardingEventListener 
listener) {
-        ValidateUtils.checkNotNull(listener, 
"addPortForwardingEventListener(%s) null instance", this);
+        PortForwardingEventListener.validateListener(listener);
         // avoid race conditions on notifications while session is being closed
         if (!isOpen()) {
             log.warn("addPortForwardingEventListener({})[{}] ignore 
registration while session is closing", this, listener);
@@ -2131,6 +2249,7 @@ public abstract class AbstractSession extends 
AbstractKexFactoryManager implemen
             return;
         }
 
+        PortForwardingEventListener.validateListener(listener);
         if (this.tunnelListeners.remove(listener)) {
             if (log.isTraceEnabled()) {
                 log.trace("removePortForwardingEventListener({})[{}] removed", 
this, listener);
@@ -2148,12 +2267,14 @@ public abstract class AbstractSession extends 
AbstractKexFactoryManager implemen
      * @param event The event to send
      * @throws IOException If any of the registered listeners threw an 
exception.
      */
-    protected void sendSessionEvent(SessionListener.Event event) throws 
IOException {
-        SessionListener listener = getSessionListenerProxy();
+    protected void signalSessionEvent(SessionListener.Event event) throws 
IOException {
         try {
-            listener.sessionEvent(this, event);
-        } catch (Throwable e) {
-            Throwable t = GenericUtils.peelException(e);
+            invokeSessionSignaller(l -> {
+                signalSessionEvent(l, event);
+                return null;
+            });
+        } catch (Throwable err) {
+            Throwable t = GenericUtils.peelException(err);
             if (log.isDebugEnabled()) {
                 log.debug("sendSessionEvent({})[{}] failed ({}) to inform 
listeners: {}",
                            this, event, t.getClass().getSimpleName(), 
t.getMessage());
@@ -2171,6 +2292,37 @@ public abstract class AbstractSession extends 
AbstractKexFactoryManager implemen
         }
     }
 
+    protected void signalSessionEvent(SessionListener listener, 
SessionListener.Event event) throws IOException {
+        if (listener == null) {
+            return;
+        }
+
+        listener.sessionEvent(this, event);
+    }
+
+    protected void invokeSessionSignaller(Invoker<SessionListener, Void> 
invoker) throws Throwable {
+        FactoryManager manager = getFactoryManager();
+        SessionListener[] listeners = {
+            (manager == null) ? null : manager.getSessionListenerProxy(),
+            getSessionListenerProxy()
+        };
+        Throwable err = null;
+        for (SessionListener l : listeners) {
+            if (l == null) {
+                continue;
+            }
+            try {
+                invoker.invoke(l);
+            } catch (Throwable t) {
+                err = GenericUtils.accumulateException(err, t);
+            }
+        }
+
+        if (err != null) {
+            throw err;
+        }
+    }
+
     @Override
     public KeyExchangeFuture reExchangeKeys() throws IOException {
         requestNewKeysExchange();
@@ -2224,6 +2376,10 @@ public abstract class AbstractSession extends 
AbstractKexFactoryManager implemen
     }
 
     protected boolean isRekeyRequired() {
+        if ((!isOpen()) || isClosing() || isClosed()) {
+            return false;
+        }
+
         KexState curState = kexState.get();
         if (!KexState.DONE.equals(curState)) {
             return false;
@@ -2388,7 +2544,7 @@ public abstract class AbstractSession extends 
AbstractKexFactoryManager implemen
      * @see #checkIdleTimeout(long, long)
      */
     protected void checkForTimeouts() throws IOException {
-        if (isClosing()) {
+        if ((!isOpen()) || isClosing() || isClosed()) {
             if (log.isDebugEnabled()) {
                 log.debug("checkForTimeouts({}) session closing", this);
                 return;

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/common/util/EventListenerUtils.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/util/EventListenerUtils.java 
b/sshd-core/src/main/java/org/apache/sshd/common/util/EventListenerUtils.java
index 3304766..1620bad 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/util/EventListenerUtils.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/util/EventListenerUtils.java
@@ -16,7 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.sshd.common.util;
 
 import java.lang.reflect.Proxy;
@@ -28,14 +27,14 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.TreeSet;
 
-
 /**
  * @author <a href="mailto:d...@mina.apache.org";>Apache MINA SSHD Project</a>
  */
 public final class EventListenerUtils {
     /**
      * A special &quot;comparator&quot; whose only purpose is to ensure
-     * there are no same references in a listener's set
+     * there are no same references in a listener's set - to be used
+     * in conjunction with a {@code TreeSet} as its comparator
      */
     @SuppressWarnings("checkstyle:anoninnerlength")
     public static final Comparator<EventListener> LISTENER_INSTANCE_COMPARATOR 
= (l1, l2) -> {
@@ -90,14 +89,14 @@ public final class EventListenerUtils {
     }
 
     /**
-     * @param <L> Type of {@link EventListener} contained in the set
+     * @param <L> Type of {@link SshdEventListener} contained in the set
      * @param listeners The listeners to pre-add to the create set - ignored
      * if (@code null}/empty
      * @return A (synchronized) {@link Set} for containing the listeners 
ensuring
      * that if same listener instance is added repeatedly only <U>one</U>
      * instance is actually contained
      */
-    public static <L extends EventListener> Set<L> 
synchronizedListenersSet(Collection<? extends L> listeners) {
+    public static <L extends SshdEventListener> Set<L> 
synchronizedListenersSet(Collection<? extends L> listeners) {
         Set<L> s = EventListenerUtils.synchronizedListenersSet();
         if (GenericUtils.size(listeners) > 0) {
             s.addAll(listeners);
@@ -107,14 +106,14 @@ public final class EventListenerUtils {
     }
 
     /**
-     * @param <L> Type of {@link EventListener} contained in the set
+     * @param <L> Type of {@link SshdEventListener} contained in the set
      * @return A (synchronized) {@link Set} for containing the listeners 
ensuring
      * that if same listener instance is added repeatedly only <U>one</U>
      * instance is actually contained
      * @see #LISTENER_INSTANCE_COMPARATOR
      */
-    public static <L extends EventListener> Set<L> synchronizedListenersSet() {
-        return Collections.synchronizedSet(new 
TreeSet<>(LISTENER_INSTANCE_COMPARATOR));
+    public static <L extends SshdEventListener> Set<L> 
synchronizedListenersSet() {
+        return Collections.synchronizedSet(new 
TreeSet<L>(LISTENER_INSTANCE_COMPARATOR));
     }
 
     /**
@@ -148,7 +147,7 @@ public final class EventListenerUtils {
      * the calls to the container
      * @see #proxyWrapper(Class, ClassLoader, Iterable)
      */
-    public static <T extends EventListener> T proxyWrapper(Class<T> 
listenerType, Iterable<? extends T> listeners) {
+    public static <T extends SshdEventListener> T proxyWrapper(Class<T> 
listenerType, Iterable<? extends T> listeners) {
         return proxyWrapper(listenerType, listenerType.getClassLoader(), 
listeners);
     }
 
@@ -157,7 +156,7 @@ public final class EventListenerUtils {
      * interface implementation. <b>Note:</b> a listener interface is one whose
      * invoked methods return <u>only</u> {@code void}.
      *
-     * @param <T>          Generic listener type
+     * @param <T>          Generic {@link SshdEventListener} type
      * @param listenerType The expected listener <u>interface</u>
      * @param loader       The {@link ClassLoader} to use for the proxy
      * @param listeners    An {@link Iterable} container of listeners to be 
invoked.
@@ -186,7 +185,7 @@ public final class EventListenerUtils {
      *                                  or a {@code null} container has been 
provided
      * @see #proxyWrapper(Class, ClassLoader, Iterable)
      */
-    public static <T extends EventListener> T proxyWrapper(Class<T> 
listenerType, ClassLoader loader, final Iterable<? extends T> listeners) {
+    public static <T extends SshdEventListener> T proxyWrapper(Class<T> 
listenerType, ClassLoader loader, final Iterable<? extends T> listeners) {
         Objects.requireNonNull(listenerType, "No listener type specified");
         ValidateUtils.checkTrue(listenerType.isInterface(), "Target proxy is 
not an interface: %s", listenerType.getSimpleName());
         Objects.requireNonNull(listeners, "No listeners container provided");

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/common/util/Invoker.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/Invoker.java 
b/sshd-core/src/main/java/org/apache/sshd/common/util/Invoker.java
new file mode 100644
index 0000000..39f4375
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/util/Invoker.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.util;
+
+import java.util.Collection;
+
+/**
+ * The complement to the {@code Callable} interface - accepts one argument
+ * and possibly throws somethind
+ *
+ * @param <ARG> Argument type
+ * @param <RET> Return type
+ * @author <a href="mailto:d...@mina.apache.org";>Apache MINA SSHD Project</a>
+ */
+@FunctionalInterface
+public interface Invoker<ARG, RET> {
+    RET invoke(ARG arg) throws Throwable;
+
+    static <ARG> Invoker<ARG, Void> wrapAll(Collection<? extends Invoker<? 
super ARG, ?>> invokers) {
+        return arg -> {
+            invokeAll(arg, invokers);
+            return null;
+        };
+    }
+
+    /**
+     * Invokes <U>all</U> the instances ignoring the return value. Any
+     * intermediate exceptions are accumulated and thrown at the end.
+     *
+     * @param <ARG> Argument type
+     * @param arg The argument to pass to the {@link #invoke(Object)} method
+     * @param invokers The invokers to scan - ignored if {@code null}/empty
+     * (also ignores {@code null} members)
+     */
+    static <ARG> void invokeAll(ARG arg, Collection<? extends Invoker<? super 
ARG, ?>> invokers) throws Throwable {
+        if (GenericUtils.isEmpty(invokers)) {
+            return;
+        }
+
+        Throwable err = null;
+        for (Invoker<? super ARG, ?> i : invokers) {
+            if (i == null) {
+                continue;
+            }
+
+            try {
+                i.invoke(arg);
+            } catch (Throwable t) {
+                err = GenericUtils.accumulateException(err, t);
+            }
+        }
+
+        if (err != null) {
+            throw err;
+        }
+    }
+
+    static <ARG> Invoker<ARG, Void> wrapFirst(Collection<? extends Invoker<? 
super ARG, ?>> invokers) {
+        return arg -> {
+            Pair<Invoker<? super ARG, ?>, Throwable> result = 
invokeTillFirstFailure(arg, invokers);
+            if (result != null) {
+                throw result.getValue();
+            }
+            return null;
+        };
+    }
+
+    /**
+     * Invokes all instances until 1st failure (if any)
+     *
+     * @param <ARG> Argument type
+     * @param arg The argument to pass to the {@link #invoke(Object)} method
+     * @param invokers The invokers to scan - ignored if {@code null}/empty
+     * (also ignores {@code null} members)
+     * @return A {@link Pair} representing the <U>first</U> failed invocation
+     * - {@code null} if all were successful (or none invoked).
+     */
+    static <ARG> Pair<Invoker<? super ARG, ?>, Throwable> 
invokeTillFirstFailure(ARG arg, Collection<? extends Invoker<? super ARG, ?>> 
invokers) {
+        if (GenericUtils.isEmpty(invokers)) {
+            return null;
+        }
+
+        for (Invoker<? super ARG, ?> i : invokers) {
+            if (i == null) {
+                continue;
+            }
+
+            try {
+                i.invoke(arg);
+            } catch (Throwable t) {
+                return new Pair<>(i, t);
+            }
+        }
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/common/util/SshdEventListener.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/util/SshdEventListener.java 
b/sshd-core/src/main/java/org/apache/sshd/common/util/SshdEventListener.java
index 16dfc9f..ea6a3dd 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/util/SshdEventListener.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/util/SshdEventListener.java
@@ -19,11 +19,26 @@
 
 package org.apache.sshd.common.util;
 
+import java.lang.reflect.Proxy;
 import java.util.EventListener;
+import java.util.Objects;
 
 /**
  * @author <a href="mailto:d...@mina.apache.org";>Apache MINA SSHD Project</a>
  */
 public interface SshdEventListener extends EventListener {
-    // marker interface for quickly locating our session listeners
+
+    /**
+     * Makes sure that the listener is neither {@code null} nor a proxy
+     *
+     * @param <L> Type of {@link SshdEventListener} being validation
+     * @param listener The listener instance
+     * @param prefix Prefix text to be prepended to validation failure messages
+     * @return The validated instance
+     */
+    static <L extends SshdEventListener> L validateListener(L listener, String 
prefix) {
+        Objects.requireNonNull(listener, prefix + ": no instance");
+        ValidateUtils.checkTrue(!Proxy.isProxyClass(listener.getClass()), 
prefix + ": proxies N/A");
+        return listener;
+    }
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/server/SignalListener.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/SignalListener.java 
b/sshd-core/src/main/java/org/apache/sshd/server/SignalListener.java
index 8c29966..2675e5c 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/SignalListener.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/SignalListener.java
@@ -32,4 +32,8 @@ public interface SignalListener extends SshdEventListener {
      * @param signal The received {@link Signal}
      */
     void signal(Signal signal);
+
+    static <L extends SignalListener> L validateListener(L listener) {
+        return SshdEventListener.validateListener(listener, 
SignalListener.class.getSimpleName());
+    }
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/server/StandardEnvironment.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/server/StandardEnvironment.java 
b/sshd-core/src/main/java/org/apache/sshd/server/StandardEnvironment.java
index b202659..8c67a43 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/StandardEnvironment.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/StandardEnvironment.java
@@ -21,11 +21,10 @@ package org.apache.sshd.server;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Map;
-import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
 
 import org.apache.sshd.common.channel.PtyMode;
-import org.apache.sshd.common.util.EventListenerUtils;
 import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.ValidateUtils;
 import org.apache.sshd.common.util.logging.AbstractLoggingBean;
@@ -61,7 +60,7 @@ public class StandardEnvironment extends AbstractLoggingBean 
implements Environm
      */
     @Override
     public void addSignalListener(SignalListener listener, Collection<Signal> 
signals) {
-        Objects.requireNonNull(listener, "No listener instance");
+        SignalListener.validateListener(listener);
         ValidateUtils.checkNotNullAndNotEmpty(signals, "No signals");
 
         for (Signal s : signals) {
@@ -81,7 +80,11 @@ public class StandardEnvironment extends AbstractLoggingBean 
implements Environm
 
     @Override
     public void removeSignalListener(SignalListener listener) {
-        Objects.requireNonNull(listener, "No listener instance");
+        if (listener == null) {
+            return;
+        }
+
+        SignalListener.validateListener(listener);
         for (Signal s : Signal.SIGNALS) {
             Collection<SignalListener> ls = getSignalListeners(s, false);
             if (ls != null) {
@@ -141,7 +144,7 @@ public class StandardEnvironment extends 
AbstractLoggingBean implements Environm
             synchronized (listeners) {
                 ls = listeners.get(signal);
                 if (ls == null) {
-                    ls =  EventListenerUtils.synchronizedListenersSet();
+                    ls = new CopyOnWriteArraySet<>();
                     listeners.put(signal, ls);
                 }
             }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java
 
b/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java
index 35f869f..bd8e083 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java
@@ -30,7 +30,6 @@ import org.apache.sshd.common.FactoryManager;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.channel.AbstractChannel;
 import org.apache.sshd.common.channel.Channel;
-import org.apache.sshd.common.channel.ChannelListener;
 import org.apache.sshd.common.channel.RequestHandler;
 import org.apache.sshd.common.channel.Window;
 import org.apache.sshd.common.session.Session;
@@ -87,32 +86,18 @@ public abstract class AbstractServerChannel extends 
AbstractChannel implements S
     }
 
     protected OpenFuture doInit(Buffer buffer) {
-        ChannelListener listener = getChannelListenerProxy();
         OpenFuture f = new DefaultOpenFuture(this);
+        String changeEvent = "doInit";
         try {
-            listener.channelOpenSuccess(this);
+            signalChannelOpenSuccess();
             f.setOpened();
         } catch (Throwable t) {
             Throwable e = GenericUtils.peelException(t);
-            try {
-                listener.channelOpenFailure(this, e);
-            } catch (Throwable err) {
-                Throwable ignored = GenericUtils.peelException(err);
-                log.warn("doInit({}) failed ({}) to inform listener of open 
failure={}: {}",
-                         this, ignored.getClass().getSimpleName(), 
e.getClass().getSimpleName(), ignored.getMessage());
-                if (log.isDebugEnabled()) {
-                    log.debug("doInit(" + this + ") listener inform failure 
details", ignored);
-                }
-                if (log.isTraceEnabled()) {
-                    Throwable[] suppressed = ignored.getSuppressed();
-                    if (GenericUtils.length(suppressed) > 0) {
-                        for (Throwable s : suppressed) {
-                            log.trace("doInit(" + this + ") suppressed channel 
open failure signalling", s);
-                        }
-                    }
-                }
-            }
+            changeEvent = e.getClass().getSimpleName();
+            signalChannelOpenFailure(e);
             f.setException(e);
+        } finally {
+            notifyStateChanged(changeEvent);
         }
 
         return f;

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
 
b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
index d33c47b..5ddb601 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
@@ -32,7 +32,6 @@ import org.apache.sshd.common.RuntimeSshException;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.channel.Channel;
 import org.apache.sshd.common.channel.ChannelFactory;
-import org.apache.sshd.common.channel.ChannelListener;
 import org.apache.sshd.common.channel.ChannelOutputStream;
 import org.apache.sshd.common.channel.OpenChannelException;
 import org.apache.sshd.common.channel.Window;
@@ -192,7 +191,6 @@ public class TcpipServerChannel extends 
AbstractServerChannel {
     }
 
     protected void handleChannelConnectResult(OpenFuture f, IoConnectFuture 
future) {
-        ChannelListener listener = getChannelListenerProxy();
         try {
             if (future.isConnected()) {
                 handleChannelOpenSuccess(f, future.getSession());
@@ -205,80 +203,35 @@ public class TcpipServerChannel extends 
AbstractServerChannel {
             }
         } catch (RuntimeException t) {
             Throwable e = GenericUtils.peelException(t);
+            signalChannelOpenFailure(e);
             try {
-                listener.channelOpenFailure(this, e);
-            } catch (Throwable err) {
-                Throwable ignored = GenericUtils.peelException(err);
-                log.warn("handleChannelConnectResult({})[exception] failed 
({}) to inform listener of open failure={}: {}",
-                         this, ignored.getClass().getSimpleName(), 
e.getClass().getSimpleName(), ignored.getMessage());
-                if (log.isDebugEnabled()) {
-                    log.debug("handleChannelConnectResult(" + this + 
")[exception] listener exception details", ignored);
-                }
-                if (log.isTraceEnabled()) {
-                    Throwable[] suppressed = ignored.getSuppressed();
-                    if (GenericUtils.length(suppressed) > 0) {
-                        for (Throwable s : suppressed) {
-                            log.trace("handleChannelConnectResult(" + this + 
") suppressed channel open failure signalling", s);
-                        }
-                    }
-                }
+                f.setException(e);
+            } finally {
+                notifyStateChanged(e.getClass().getSimpleName());
             }
-            f.setException(e);
         }
     }
 
     protected void handleChannelOpenSuccess(OpenFuture f, IoSession session) {
         ioSession = session;
 
-        ChannelListener listener = getChannelListenerProxy();
+        String changeEvent = session.toString();
         try {
-            listener.channelOpenSuccess(this);
+            signalChannelOpenSuccess();
             f.setOpened();
         } catch (Throwable t) {
             Throwable e = GenericUtils.peelException(t);
-            try {
-                listener.channelOpenFailure(this, e);
-            } catch (Throwable err) {
-                Throwable ignored = GenericUtils.peelException(err);
-                log.warn("handleChannelOpenSuccess({}) failed ({}) to inform 
listener of open failure={}: {}",
-                         this, ignored.getClass().getSimpleName(), 
e.getClass().getSimpleName(), ignored.getMessage());
-                if (log.isDebugEnabled()) {
-                    log.debug("doInit(" + this + ") listener inform failure 
details", ignored);
-                }
-                if (log.isTraceEnabled()) {
-                    Throwable[] suppressed = ignored.getSuppressed();
-                    if (GenericUtils.length(suppressed) > 0) {
-                        for (Throwable s : suppressed) {
-                            log.trace("handleChannelOpenSuccess(" + this + ") 
suppressed channel open failure signalling", s);
-                        }
-                    }
-                }
-            }
+            changeEvent = e.getClass().getSimpleName();
+            signalChannelOpenFailure(e);
             f.setException(e);
+        } finally {
+            notifyStateChanged(changeEvent);
         }
     }
 
     protected void handleChannelOpenFailure(OpenFuture f, Throwable problem) {
-        ChannelListener listener = getChannelListenerProxy();
-        try {
-            listener.channelOpenFailure(this, problem);
-        } catch (Throwable err) {
-            Throwable ignored = GenericUtils.peelException(err);
-            log.warn("handleChannelOpenFailure({}) failed ({}) to inform 
listener of open failure={}: {}",
-                     this, ignored.getClass().getSimpleName(), 
problem.getClass().getSimpleName(), ignored.getMessage());
-            if (log.isDebugEnabled()) {
-                log.debug("handleChannelOpenFailure(" + this + ") listener 
inform open failure details", ignored);
-            }
-            if (log.isTraceEnabled()) {
-                Throwable[] suppressed = ignored.getSuppressed();
-                if (GenericUtils.length(suppressed) > 0) {
-                    for (Throwable s : suppressed) {
-                        log.trace("handleOpenChannelFailure(" + this + ") 
suppressed channel open failure signalling", s);
-                    }
-                }
-            }
-        }
-
+        signalChannelOpenFailure(problem);
+        notifyStateChanged(problem.getClass().getSimpleName());
         closeImmediately0();
 
         if (problem instanceof ConnectException) {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/server/scp/ScpCommandFactory.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/server/scp/ScpCommandFactory.java 
b/sshd-core/src/main/java/org/apache/sshd/server/scp/ScpCommandFactory.java
index d53b666..67549b5 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/scp/ScpCommandFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/scp/ScpCommandFactory.java
@@ -19,6 +19,7 @@
 package org.apache.sshd.server.scp;
 
 import java.util.Collection;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.sshd.common.scp.ScpFileOpener;
@@ -106,8 +107,7 @@ public class ScpCommandFactory implements 
ScpFileOpenerHolder, CommandFactory, C
     private ScpFileOpener fileOpener;
     private int sendBufferSize = ScpHelper.MIN_SEND_BUFFER_SIZE;
     private int receiveBufferSize = ScpHelper.MIN_RECEIVE_BUFFER_SIZE;
-    private Collection<ScpTransferEventListener> listeners =
-            EventListenerUtils.synchronizedListenersSet();
+    private Collection<ScpTransferEventListener> listeners = new 
CopyOnWriteArraySet<>();
     private ScpTransferEventListener listenerProxy;
 
     public ScpCommandFactory() {
@@ -258,7 +258,7 @@ public class ScpCommandFactory implements 
ScpFileOpenerHolder, CommandFactory, C
         try {
             ScpCommandFactory other = getClass().cast(super.clone());
             // clone the listeners set as well
-            other.listeners = 
EventListenerUtils.synchronizedListenersSet(this.listeners);
+            other.listeners = new CopyOnWriteArraySet<>(this.listeners);
             other.listenerProxy = 
EventListenerUtils.proxyWrapper(ScpTransferEventListener.class, 
getClass().getClassLoader(), other.listeners);
             return other;
         } catch (CloneNotSupportedException e) {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/server/session/ServerConnectionServiceFactory.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/server/session/ServerConnectionServiceFactory.java
 
b/sshd-core/src/main/java/org/apache/sshd/server/session/ServerConnectionServiceFactory.java
index 6b18e23..c2e7b9c 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/server/session/ServerConnectionServiceFactory.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/server/session/ServerConnectionServiceFactory.java
@@ -41,6 +41,11 @@ public class ServerConnectionServiceFactory extends 
AbstractConnectionServiceFac
         public void 
removePortForwardingEventListener(PortForwardingEventListener listener) {
             throw new 
UnsupportedOperationException("removePortForwardingEventListener(" + listener + 
") N/A on default instance");
         }
+
+        @Override
+        public PortForwardingEventListener 
getPortForwardingEventListenerProxy() {
+            return PortForwardingEventListener.EMPTY;
+        }
     };
 
     public ServerConnectionServiceFactory() {
@@ -56,7 +61,7 @@ public class ServerConnectionServiceFactory extends 
AbstractConnectionServiceFac
     public Service create(Session session) throws IOException {
         AbstractServerSession abstractSession = 
ValidateUtils.checkInstanceOf(session, AbstractServerSession.class, "Not a 
server session: %s", session);
         ServerConnectionService service = new 
ServerConnectionService(abstractSession);
-        
service.addPortForwardingEventListener(getPortForwardingEventListenerProxy());
+        service.addPortForwardingEventListenerManager(this);
         return service;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSessionImpl.java 
b/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSessionImpl.java
index a35734e..8ff2810 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSessionImpl.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSessionImpl.java
@@ -19,9 +19,7 @@
 package org.apache.sshd.server.session;
 
 import org.apache.sshd.common.PropertyResolverUtils;
-import org.apache.sshd.common.RuntimeSshException;
 import org.apache.sshd.common.io.IoSession;
-import org.apache.sshd.common.session.SessionListener;
 import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.server.ServerFactoryManager;
 
@@ -33,30 +31,7 @@ import org.apache.sshd.server.ServerFactoryManager;
 public class ServerSessionImpl extends AbstractServerSession {
     public ServerSessionImpl(ServerFactoryManager server, IoSession ioSession) 
throws Exception {
         super(server, ioSession);
-
-        if (log.isDebugEnabled()) {
-            log.debug("Server session created {}", ioSession);
-        }
-
-        // Inform the listener of the newly created session
-        SessionListener listener = getSessionListenerProxy();
-        try {
-            listener.sessionCreated(this);
-        } catch (Throwable t) {
-            Throwable e = GenericUtils.peelException(t);
-            if (log.isDebugEnabled()) {
-                log.debug("Failed ({}) to announce session={} created: {}",
-                          e.getClass().getSimpleName(), ioSession, 
e.getMessage());
-            }
-            if (log.isTraceEnabled()) {
-                log.trace("Session=" + ioSession + " creation failure 
details", e);
-            }
-            if (e instanceof Exception) {
-                throw (Exception) e;
-            } else {
-                throw new RuntimeSshException(e);
-            }
-        }
+        signalSessionCreated(ioSession);
 
         String headerConfig = PropertyResolverUtils.getString(this, 
ServerFactoryManager.SERVER_EXTRA_IDENTIFICATION_LINES);
         String[] headers = GenericUtils.split(headerConfig, 
ServerFactoryManager.SERVER_EXTRA_IDENT_LINES_SEPARATOR);

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/AbstractSftpEventListenerManager.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/AbstractSftpEventListenerManager.java
 
b/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/AbstractSftpEventListenerManager.java
index 15de46a..9289458 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/AbstractSftpEventListenerManager.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/AbstractSftpEventListenerManager.java
@@ -20,7 +20,7 @@
 package org.apache.sshd.server.subsystem.sftp;
 
 import java.util.Collection;
-import java.util.Objects;
+import java.util.concurrent.CopyOnWriteArraySet;
 
 import org.apache.sshd.common.util.EventListenerUtils;
 
@@ -28,8 +28,7 @@ import org.apache.sshd.common.util.EventListenerUtils;
  * @author <a href="mailto:d...@mina.apache.org";>Apache MINA SSHD Project</a>
  */
 public abstract class AbstractSftpEventListenerManager implements 
SftpEventListenerManager {
-    private final Collection<SftpEventListener> sftpEventListeners =
-            EventListenerUtils.synchronizedListenersSet();
+    private final Collection<SftpEventListener> sftpEventListeners = new 
CopyOnWriteArraySet<>();
     private final SftpEventListener sftpEventListenerProxy;
 
     protected AbstractSftpEventListenerManager() {
@@ -48,11 +47,15 @@ public abstract class AbstractSftpEventListenerManager 
implements SftpEventListe
 
     @Override
     public boolean addSftpEventListener(SftpEventListener listener) {
-        return sftpEventListeners.add(Objects.requireNonNull(listener, "No 
listener"));
+        return 
sftpEventListeners.add(SftpEventListener.validateListener(listener));
     }
 
     @Override
     public boolean removeSftpEventListener(SftpEventListener listener) {
-        return sftpEventListeners.remove(listener);
+        if (listener == null) {
+            return false;
+        }
+
+        return 
sftpEventListeners.remove(SftpEventListener.validateListener(listener));
     }
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpEventListener.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpEventListener.java
 
b/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpEventListener.java
index f5736c9..6af7199 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpEventListener.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpEventListener.java
@@ -376,4 +376,8 @@ public interface SftpEventListener extends 
SshdEventListener {
             throws IOException {
                 // ignored
     }
+
+    static <L extends SftpEventListener> L validateListener(L listener) {
+        return SshdEventListener.validateListener(listener, 
SftpEventListener.class.getSimpleName());
+    }
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java
 
b/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java
index 938e13e..8175520 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java
@@ -69,6 +69,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -278,8 +279,7 @@ public class SftpSubsystem
 
     private ServerSession serverSession;
     private final AtomicBoolean closed = new AtomicBoolean(false);
-    private final Collection<SftpEventListener> sftpEventListeners =
-            EventListenerUtils.synchronizedListenersSet();
+    private final Collection<SftpEventListener> sftpEventListeners = new 
CopyOnWriteArraySet<>();
     private final SftpEventListener sftpEventListenerProxy;
     private final SftpFileSystemAccessor fileSystemAccessor;
 
@@ -328,12 +328,16 @@ public class SftpSubsystem
 
     @Override
     public boolean addSftpEventListener(SftpEventListener listener) {
-        return sftpEventListeners.add(Objects.requireNonNull(listener, "No 
listener"));
+        return 
sftpEventListeners.add(SftpEventListener.validateListener(listener));
     }
 
     @Override
     public boolean removeSftpEventListener(SftpEventListener listener) {
-        return sftpEventListeners.remove(listener);
+        if (listener == null) {
+            return false;
+        }
+
+        return 
sftpEventListeners.remove(SftpEventListener.validateListener(listener));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/test/java/org/apache/sshd/ProxyTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/ProxyTest.java 
b/sshd-core/src/test/java/org/apache/sshd/ProxyTest.java
index 1862472..d5ced50 100644
--- a/sshd-core/src/test/java/org/apache/sshd/ProxyTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/ProxyTest.java
@@ -62,6 +62,7 @@ public class ProxyTest extends BaseTestSupport {
     private int echoPort;
     private IoAcceptor acceptor;
     private SshClient client;
+
     @SuppressWarnings("checkstyle:anoninnerlength")
     private final PortForwardingEventListener serverSideListener = new 
PortForwardingEventListener() {
         private final Logger log = LoggerFactory.getLogger(ProxyTest.class);
@@ -118,6 +119,10 @@ public class ProxyTest extends BaseTestSupport {
         }
     };
 
+    public ProxyTest() {
+        super();
+    }
+
     @Before
     public void setUp() throws Exception {
         sshd = setupTestServer();
@@ -222,6 +227,11 @@ public class ProxyTest extends BaseTestSupport {
                 assertSame("Establishment indication not invoked", local, 
localAddressHolder.get());
                 assertNull("Multiple calls to establishment indicator", 
boundAddressHolder.getAndSet(boundAddress));
             }
+
+            @Override
+            public String toString() {
+                return getCurrentTestName();
+            }
         };
 
         try (ClientSession session = createNativeSession(listener)) {

Reply via email to