Repository: mina-sshd Updated Branches: refs/heads/master 6c92dcce8 -> 38c84a830
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java index 97b5e4d..dc09e33 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java @@ -34,6 +34,7 @@ import java.util.Map; import java.util.Objects; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -78,6 +79,7 @@ import org.apache.sshd.common.session.SessionListener; import org.apache.sshd.common.session.SessionWorkBuffer; import org.apache.sshd.common.util.EventListenerUtils; import org.apache.sshd.common.util.GenericUtils; +import org.apache.sshd.common.util.Invoker; import org.apache.sshd.common.util.NumberUtils; import org.apache.sshd.common.util.Pair; import org.apache.sshd.common.util.Readable; @@ -133,22 +135,19 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen /** * Session listeners container */ - protected final Collection<SessionListener> sessionListeners = - EventListenerUtils.synchronizedListenersSet(); + protected final Collection<SessionListener> sessionListeners = new CopyOnWriteArraySet<>(); protected final SessionListener sessionListenerProxy; /** * Channel events listener container */ - protected final Collection<ChannelListener> channelListeners = - EventListenerUtils.synchronizedListenersSet(); + protected final Collection<ChannelListener> channelListeners = new CopyOnWriteArraySet<>(); protected final ChannelListener channelListenerProxy; /** * Port forwarding events listener container */ - protected final Collection<PortForwardingEventListener> tunnelListeners = - EventListenerUtils.synchronizedListenersSet(); + protected final Collection<PortForwardingEventListener> tunnelListeners = new CopyOnWriteArraySet<>(); protected final PortForwardingEventListener tunnelListenerProxy; /* @@ -265,11 +264,36 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen sessionListenerProxy = EventListenerUtils.proxyWrapper(SessionListener.class, loader, sessionListeners); channelListenerProxy = EventListenerUtils.proxyWrapper(ChannelListener.class, loader, channelListeners); tunnelListenerProxy = EventListenerUtils.proxyWrapper(PortForwardingEventListener.class, loader, tunnelListeners); + } + + protected void signalSessionCreated(IoSession ioSession) throws Exception { + try { + invokeSessionSignaller(l -> { + signalSessionCreated(l); + return null; + }); + } catch (Throwable err) { + Throwable e = GenericUtils.peelException(err); + if (log.isDebugEnabled()) { + log.debug("Failed ({}) to announce session={} created: {}", + e.getClass().getSimpleName(), ioSession, e.getMessage()); + } + if (log.isTraceEnabled()) { + log.trace("Session=" + ioSession + " creation failure details", e); + } + if (e instanceof Exception) { + throw (Exception) e; + } else { + throw new RuntimeSshException(e); + } + } + } - // Delegate the task of further notifications to the session - addSessionListener(factoryManager.getSessionListenerProxy()); - addChannelListener(factoryManager.getChannelListenerProxy()); - addPortForwardingEventListener(factoryManager.getPortForwardingEventListenerProxy()); + protected void signalSessionCreated(SessionListener listener) { + if (listener == null) { + return; + } + listener.sessionCreated(this); } /** @@ -404,7 +428,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen @Override public void setAuthenticated() throws IOException { this.authed = true; - sendSessionEvent(SessionListener.Event.Authenticated); + signalSessionEvent(SessionListener.Event.Authenticated); } /** @@ -736,7 +760,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen "Unknown negotiated KEX algorithm: %s", kexAlgorithm); kex.init(this, serverVersion.getBytes(StandardCharsets.UTF_8), clientVersion.getBytes(StandardCharsets.UTF_8), i_s, i_c); - sendSessionEvent(SessionListener.Event.KexCompleted); + signalSessionEvent(SessionListener.Event.KexCompleted); } protected void handleNewKeys(int cmd, Buffer buffer) throws Exception { @@ -756,7 +780,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen } } - sendSessionEvent(SessionListener.Event.KeyEstablished); + signalSessionEvent(SessionListener.Event.KeyEstablished); synchronized (pendingPackets) { if (!pendingPackets.isEmpty()) { if (log.isDebugEnabled()) { @@ -812,24 +836,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen log.debug("exceptionCaught(" + this + ")[state=" + curState + "] details", t); } - SessionListener listener = getSessionListenerProxy(); - try { - listener.sessionException(this, t); - } catch (Throwable err) { - Throwable e = GenericUtils.peelException(err); - if (log.isDebugEnabled()) { - log.debug("exceptionCaught(" + this + ") signal session exception details", e); - } - - if (log.isTraceEnabled()) { - Throwable[] suppressed = e.getSuppressed(); - if (GenericUtils.length(suppressed) > 0) { - for (Throwable s : suppressed) { - log.trace("exceptionCaught(" + this + ") suppressed session exception signalling", s); - } - } - } - } + signalExceptionCaught(t); if (State.Opened.equals(curState) && (t instanceof SshException)) { int code = ((SshException) t).getDisconnectCode(); @@ -852,6 +859,37 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen close(true); } + protected void signalExceptionCaught(Throwable t) { + try { + invokeSessionSignaller(l -> { + signalExceptionCaught(l, t); + return null; + }); + } catch (Throwable err) { + Throwable e = GenericUtils.peelException(err); + if (log.isDebugEnabled()) { + log.debug("exceptionCaught(" + this + ") signal session exception details", e); + } + + if (log.isTraceEnabled()) { + Throwable[] suppressed = e.getSuppressed(); + if (GenericUtils.length(suppressed) > 0) { + for (Throwable s : suppressed) { + log.trace("exceptionCaught(" + this + ") suppressed session exception signalling", s); + } + } + } + } + } + + protected void signalExceptionCaught(SessionListener listener, Throwable t) { + if (listener == null) { + return; + } + + listener.sessionException(this, t); + } + @Override protected Closeable getInnerCloseable() { return builder() @@ -880,32 +918,48 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen } // Fire 'close' event - SessionListener listener = getSessionListenerProxy(); try { - listener.sessionClosed(this); - } catch (Throwable t) { - Throwable e = GenericUtils.peelException(t); - log.warn("preClose({}) {} while signal session closed: {}", this, e.getClass().getSimpleName(), e.getMessage()); + signalSessionClosed(); + } finally { + // clear the listeners since we are closing the session (quicker GC) + this.sessionListeners.clear(); + this.channelListeners.clear(); + this.tunnelListeners.clear(); + } + + super.preClose(); + } + + protected void signalSessionClosed() { + try { + invokeSessionSignaller(l -> { + signalSessionClosed(l); + return null; + }); + } catch (Throwable err) { + Throwable e = GenericUtils.peelException(err); + log.warn("signalSessionClosed({}) {} while signal session closed: {}", this, e.getClass().getSimpleName(), e.getMessage()); if (log.isDebugEnabled()) { - log.debug("preClose(" + this + ") signal session closed exception details", e); + log.debug("signalSessionClosed(" + this + ") signal session closed exception details", e); } if (log.isTraceEnabled()) { Throwable[] suppressed = e.getSuppressed(); if (GenericUtils.length(suppressed) > 0) { for (Throwable s : suppressed) { - log.trace("preClose(" + this + ") suppressed session closed signalling", s); + log.trace("signalSessionClosed(" + this + ") suppressed session closed signalling", s); } } } - } finally { - // clear the listeners since we are closing the session (quicker GC) - this.sessionListeners.clear(); - this.channelListeners.clear(); - this.tunnelListeners.clear(); } + } - super.preClose(); + protected void signalSessionClosed(SessionListener listener) { + if (listener == null) { + return; + } + + listener.sessionClosed(this); } protected List<Service> getServices() { @@ -1866,10 +1920,9 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen * @return The negotiated options {@link Map} */ protected Map<KexProposalOption, String> negotiate() { - SessionListener listener = getSessionListenerProxy(); Map<KexProposalOption, String> c2sOptions = Collections.unmodifiableMap(clientProposal); Map<KexProposalOption, String> s2cOptions = Collections.unmodifiableMap(serverProposal); - listener.sessionNegotiationStart(this, c2sOptions, s2cOptions); + signalNegotiationStart(c2sOptions, s2cOptions); Map<KexProposalOption, String> guess = new EnumMap<>(KexProposalOption.class); Map<KexProposalOption, String> negotiatedGuess = Collections.unmodifiableMap(guess); @@ -1914,14 +1967,69 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen } } } catch (RuntimeException | Error e) { - listener.sessionNegotiationEnd(this, c2sOptions, s2cOptions, negotiatedGuess, e); + signalNegotiationEnd(c2sOptions, s2cOptions, negotiatedGuess, e); throw e; } - listener.sessionNegotiationEnd(this, c2sOptions, s2cOptions, negotiatedGuess, null); + signalNegotiationEnd(c2sOptions, s2cOptions, negotiatedGuess, null); return setNegotiationResult(guess); } + protected void signalNegotiationStart(Map<KexProposalOption, String> c2sOptions, Map<KexProposalOption, String> s2cOptions) { + try { + invokeSessionSignaller(l -> { + signalNegotiationStart(l, c2sOptions, s2cOptions); + return null; + }); + } catch (Throwable err) { + if (err instanceof RuntimeException) { + throw (RuntimeException) err; + } else if (err instanceof Error) { + throw (Error) err; + } else { + throw new RuntimeException(err); + } + } + } + + protected void signalNegotiationStart( + SessionListener listener, Map<KexProposalOption, String> c2sOptions, Map<KexProposalOption, String> s2cOptions) { + if (listener == null) { + return; + } + + listener.sessionNegotiationStart(this, c2sOptions, s2cOptions); + } + + protected void signalNegotiationEnd( + Map<KexProposalOption, String> c2sOptions, Map<KexProposalOption, String> s2cOptions, + Map<KexProposalOption, String> negotiatedGuess, Throwable reason) { + try { + invokeSessionSignaller(l -> { + signalNegotiationEnd(l, c2sOptions, s2cOptions, negotiatedGuess, reason); + return null; + }); + } catch (Throwable err) { + if (err instanceof RuntimeException) { + throw (RuntimeException) err; + } else if (err instanceof Error) { + throw (Error) err; + } else { + throw new RuntimeException(err); + } + } + } + + protected void signalNegotiationEnd(SessionListener listener, + Map<KexProposalOption, String> c2sOptions, Map<KexProposalOption, String> s2cOptions, + Map<KexProposalOption, String> negotiatedGuess, Throwable reason) { + if (listener == null) { + return; + } + + listener.sessionNegotiationEnd(this, c2sOptions, s2cOptions, negotiatedGuess, null); + } + protected Map<KexProposalOption, String> setNegotiationResult(Map<KexProposalOption, String> guess) { synchronized (negotiationResult) { if (!negotiationResult.isEmpty()) { @@ -2026,7 +2134,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen @Override public void addSessionListener(SessionListener listener) { - ValidateUtils.checkNotNull(listener, "addSessionListener(%s) null instance", this); + SessionListener.validateListener(listener); // avoid race conditions on notifications while session is being closed if (!isOpen()) { log.warn("addSessionListener({})[{}] ignore registration while session is closing", this, listener); @@ -2046,6 +2154,11 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen @Override public void removeSessionListener(SessionListener listener) { + if (listener == null) { + return; + } + + SessionListener.validateListener(listener); if (this.sessionListeners.remove(listener)) { if (log.isTraceEnabled()) { log.trace("removeSessionListener({})[{}] removed", this, listener); @@ -2064,7 +2177,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen @Override public void addChannelListener(ChannelListener listener) { - ValidateUtils.checkNotNull(listener, "addChannelListener(%s) null instance", this); + ChannelListener.validateListener(listener); // avoid race conditions on notifications while session is being closed if (!isOpen()) { log.warn("addChannelListener({})[{}] ignore registration while session is closing", this, listener); @@ -2084,6 +2197,11 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen @Override public void removeChannelListener(ChannelListener listener) { + if (listener == null) { + return; + } + + ChannelListener.validateListener(listener); if (this.channelListeners.remove(listener)) { if (log.isTraceEnabled()) { log.trace("removeChannelListener({})[{}] removed", this, listener); @@ -2107,7 +2225,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen @Override public void addPortForwardingEventListener(PortForwardingEventListener listener) { - ValidateUtils.checkNotNull(listener, "addPortForwardingEventListener(%s) null instance", this); + PortForwardingEventListener.validateListener(listener); // avoid race conditions on notifications while session is being closed if (!isOpen()) { log.warn("addPortForwardingEventListener({})[{}] ignore registration while session is closing", this, listener); @@ -2131,6 +2249,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen return; } + PortForwardingEventListener.validateListener(listener); if (this.tunnelListeners.remove(listener)) { if (log.isTraceEnabled()) { log.trace("removePortForwardingEventListener({})[{}] removed", this, listener); @@ -2148,12 +2267,14 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen * @param event The event to send * @throws IOException If any of the registered listeners threw an exception. */ - protected void sendSessionEvent(SessionListener.Event event) throws IOException { - SessionListener listener = getSessionListenerProxy(); + protected void signalSessionEvent(SessionListener.Event event) throws IOException { try { - listener.sessionEvent(this, event); - } catch (Throwable e) { - Throwable t = GenericUtils.peelException(e); + invokeSessionSignaller(l -> { + signalSessionEvent(l, event); + return null; + }); + } catch (Throwable err) { + Throwable t = GenericUtils.peelException(err); if (log.isDebugEnabled()) { log.debug("sendSessionEvent({})[{}] failed ({}) to inform listeners: {}", this, event, t.getClass().getSimpleName(), t.getMessage()); @@ -2171,6 +2292,37 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen } } + protected void signalSessionEvent(SessionListener listener, SessionListener.Event event) throws IOException { + if (listener == null) { + return; + } + + listener.sessionEvent(this, event); + } + + protected void invokeSessionSignaller(Invoker<SessionListener, Void> invoker) throws Throwable { + FactoryManager manager = getFactoryManager(); + SessionListener[] listeners = { + (manager == null) ? null : manager.getSessionListenerProxy(), + getSessionListenerProxy() + }; + Throwable err = null; + for (SessionListener l : listeners) { + if (l == null) { + continue; + } + try { + invoker.invoke(l); + } catch (Throwable t) { + err = GenericUtils.accumulateException(err, t); + } + } + + if (err != null) { + throw err; + } + } + @Override public KeyExchangeFuture reExchangeKeys() throws IOException { requestNewKeysExchange(); @@ -2224,6 +2376,10 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen } protected boolean isRekeyRequired() { + if ((!isOpen()) || isClosing() || isClosed()) { + return false; + } + KexState curState = kexState.get(); if (!KexState.DONE.equals(curState)) { return false; @@ -2388,7 +2544,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen * @see #checkIdleTimeout(long, long) */ protected void checkForTimeouts() throws IOException { - if (isClosing()) { + if ((!isOpen()) || isClosing() || isClosed()) { if (log.isDebugEnabled()) { log.debug("checkForTimeouts({}) session closing", this); return; http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/common/util/EventListenerUtils.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/EventListenerUtils.java b/sshd-core/src/main/java/org/apache/sshd/common/util/EventListenerUtils.java index 3304766..1620bad 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/util/EventListenerUtils.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/util/EventListenerUtils.java @@ -16,7 +16,6 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.sshd.common.util; import java.lang.reflect.Proxy; @@ -28,14 +27,14 @@ import java.util.Objects; import java.util.Set; import java.util.TreeSet; - /** * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> */ public final class EventListenerUtils { /** * A special "comparator" whose only purpose is to ensure - * there are no same references in a listener's set + * there are no same references in a listener's set - to be used + * in conjunction with a {@code TreeSet} as its comparator */ @SuppressWarnings("checkstyle:anoninnerlength") public static final Comparator<EventListener> LISTENER_INSTANCE_COMPARATOR = (l1, l2) -> { @@ -90,14 +89,14 @@ public final class EventListenerUtils { } /** - * @param <L> Type of {@link EventListener} contained in the set + * @param <L> Type of {@link SshdEventListener} contained in the set * @param listeners The listeners to pre-add to the create set - ignored * if (@code null}/empty * @return A (synchronized) {@link Set} for containing the listeners ensuring * that if same listener instance is added repeatedly only <U>one</U> * instance is actually contained */ - public static <L extends EventListener> Set<L> synchronizedListenersSet(Collection<? extends L> listeners) { + public static <L extends SshdEventListener> Set<L> synchronizedListenersSet(Collection<? extends L> listeners) { Set<L> s = EventListenerUtils.synchronizedListenersSet(); if (GenericUtils.size(listeners) > 0) { s.addAll(listeners); @@ -107,14 +106,14 @@ public final class EventListenerUtils { } /** - * @param <L> Type of {@link EventListener} contained in the set + * @param <L> Type of {@link SshdEventListener} contained in the set * @return A (synchronized) {@link Set} for containing the listeners ensuring * that if same listener instance is added repeatedly only <U>one</U> * instance is actually contained * @see #LISTENER_INSTANCE_COMPARATOR */ - public static <L extends EventListener> Set<L> synchronizedListenersSet() { - return Collections.synchronizedSet(new TreeSet<>(LISTENER_INSTANCE_COMPARATOR)); + public static <L extends SshdEventListener> Set<L> synchronizedListenersSet() { + return Collections.synchronizedSet(new TreeSet<L>(LISTENER_INSTANCE_COMPARATOR)); } /** @@ -148,7 +147,7 @@ public final class EventListenerUtils { * the calls to the container * @see #proxyWrapper(Class, ClassLoader, Iterable) */ - public static <T extends EventListener> T proxyWrapper(Class<T> listenerType, Iterable<? extends T> listeners) { + public static <T extends SshdEventListener> T proxyWrapper(Class<T> listenerType, Iterable<? extends T> listeners) { return proxyWrapper(listenerType, listenerType.getClassLoader(), listeners); } @@ -157,7 +156,7 @@ public final class EventListenerUtils { * interface implementation. <b>Note:</b> a listener interface is one whose * invoked methods return <u>only</u> {@code void}. * - * @param <T> Generic listener type + * @param <T> Generic {@link SshdEventListener} type * @param listenerType The expected listener <u>interface</u> * @param loader The {@link ClassLoader} to use for the proxy * @param listeners An {@link Iterable} container of listeners to be invoked. @@ -186,7 +185,7 @@ public final class EventListenerUtils { * or a {@code null} container has been provided * @see #proxyWrapper(Class, ClassLoader, Iterable) */ - public static <T extends EventListener> T proxyWrapper(Class<T> listenerType, ClassLoader loader, final Iterable<? extends T> listeners) { + public static <T extends SshdEventListener> T proxyWrapper(Class<T> listenerType, ClassLoader loader, final Iterable<? extends T> listeners) { Objects.requireNonNull(listenerType, "No listener type specified"); ValidateUtils.checkTrue(listenerType.isInterface(), "Target proxy is not an interface: %s", listenerType.getSimpleName()); Objects.requireNonNull(listeners, "No listeners container provided"); http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/common/util/Invoker.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/Invoker.java b/sshd-core/src/main/java/org/apache/sshd/common/util/Invoker.java new file mode 100644 index 0000000..39f4375 --- /dev/null +++ b/sshd-core/src/main/java/org/apache/sshd/common/util/Invoker.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sshd.common.util; + +import java.util.Collection; + +/** + * The complement to the {@code Callable} interface - accepts one argument + * and possibly throws somethind + * + * @param <ARG> Argument type + * @param <RET> Return type + * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> + */ +@FunctionalInterface +public interface Invoker<ARG, RET> { + RET invoke(ARG arg) throws Throwable; + + static <ARG> Invoker<ARG, Void> wrapAll(Collection<? extends Invoker<? super ARG, ?>> invokers) { + return arg -> { + invokeAll(arg, invokers); + return null; + }; + } + + /** + * Invokes <U>all</U> the instances ignoring the return value. Any + * intermediate exceptions are accumulated and thrown at the end. + * + * @param <ARG> Argument type + * @param arg The argument to pass to the {@link #invoke(Object)} method + * @param invokers The invokers to scan - ignored if {@code null}/empty + * (also ignores {@code null} members) + */ + static <ARG> void invokeAll(ARG arg, Collection<? extends Invoker<? super ARG, ?>> invokers) throws Throwable { + if (GenericUtils.isEmpty(invokers)) { + return; + } + + Throwable err = null; + for (Invoker<? super ARG, ?> i : invokers) { + if (i == null) { + continue; + } + + try { + i.invoke(arg); + } catch (Throwable t) { + err = GenericUtils.accumulateException(err, t); + } + } + + if (err != null) { + throw err; + } + } + + static <ARG> Invoker<ARG, Void> wrapFirst(Collection<? extends Invoker<? super ARG, ?>> invokers) { + return arg -> { + Pair<Invoker<? super ARG, ?>, Throwable> result = invokeTillFirstFailure(arg, invokers); + if (result != null) { + throw result.getValue(); + } + return null; + }; + } + + /** + * Invokes all instances until 1st failure (if any) + * + * @param <ARG> Argument type + * @param arg The argument to pass to the {@link #invoke(Object)} method + * @param invokers The invokers to scan - ignored if {@code null}/empty + * (also ignores {@code null} members) + * @return A {@link Pair} representing the <U>first</U> failed invocation + * - {@code null} if all were successful (or none invoked). + */ + static <ARG> Pair<Invoker<? super ARG, ?>, Throwable> invokeTillFirstFailure(ARG arg, Collection<? extends Invoker<? super ARG, ?>> invokers) { + if (GenericUtils.isEmpty(invokers)) { + return null; + } + + for (Invoker<? super ARG, ?> i : invokers) { + if (i == null) { + continue; + } + + try { + i.invoke(arg); + } catch (Throwable t) { + return new Pair<>(i, t); + } + } + + return null; + } +} http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/common/util/SshdEventListener.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/SshdEventListener.java b/sshd-core/src/main/java/org/apache/sshd/common/util/SshdEventListener.java index 16dfc9f..ea6a3dd 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/util/SshdEventListener.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/util/SshdEventListener.java @@ -19,11 +19,26 @@ package org.apache.sshd.common.util; +import java.lang.reflect.Proxy; import java.util.EventListener; +import java.util.Objects; /** * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> */ public interface SshdEventListener extends EventListener { - // marker interface for quickly locating our session listeners + + /** + * Makes sure that the listener is neither {@code null} nor a proxy + * + * @param <L> Type of {@link SshdEventListener} being validation + * @param listener The listener instance + * @param prefix Prefix text to be prepended to validation failure messages + * @return The validated instance + */ + static <L extends SshdEventListener> L validateListener(L listener, String prefix) { + Objects.requireNonNull(listener, prefix + ": no instance"); + ValidateUtils.checkTrue(!Proxy.isProxyClass(listener.getClass()), prefix + ": proxies N/A"); + return listener; + } } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/server/SignalListener.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/server/SignalListener.java b/sshd-core/src/main/java/org/apache/sshd/server/SignalListener.java index 8c29966..2675e5c 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/SignalListener.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/SignalListener.java @@ -32,4 +32,8 @@ public interface SignalListener extends SshdEventListener { * @param signal The received {@link Signal} */ void signal(Signal signal); + + static <L extends SignalListener> L validateListener(L listener) { + return SshdEventListener.validateListener(listener, SignalListener.class.getSimpleName()); + } } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/server/StandardEnvironment.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/server/StandardEnvironment.java b/sshd-core/src/main/java/org/apache/sshd/server/StandardEnvironment.java index b202659..8c67a43 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/StandardEnvironment.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/StandardEnvironment.java @@ -21,11 +21,10 @@ package org.apache.sshd.server; import java.util.Arrays; import java.util.Collection; import java.util.Map; -import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; import org.apache.sshd.common.channel.PtyMode; -import org.apache.sshd.common.util.EventListenerUtils; import org.apache.sshd.common.util.GenericUtils; import org.apache.sshd.common.util.ValidateUtils; import org.apache.sshd.common.util.logging.AbstractLoggingBean; @@ -61,7 +60,7 @@ public class StandardEnvironment extends AbstractLoggingBean implements Environm */ @Override public void addSignalListener(SignalListener listener, Collection<Signal> signals) { - Objects.requireNonNull(listener, "No listener instance"); + SignalListener.validateListener(listener); ValidateUtils.checkNotNullAndNotEmpty(signals, "No signals"); for (Signal s : signals) { @@ -81,7 +80,11 @@ public class StandardEnvironment extends AbstractLoggingBean implements Environm @Override public void removeSignalListener(SignalListener listener) { - Objects.requireNonNull(listener, "No listener instance"); + if (listener == null) { + return; + } + + SignalListener.validateListener(listener); for (Signal s : Signal.SIGNALS) { Collection<SignalListener> ls = getSignalListeners(s, false); if (ls != null) { @@ -141,7 +144,7 @@ public class StandardEnvironment extends AbstractLoggingBean implements Environm synchronized (listeners) { ls = listeners.get(signal); if (ls == null) { - ls = EventListenerUtils.synchronizedListenersSet(); + ls = new CopyOnWriteArraySet<>(); listeners.put(signal, ls); } } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java index 35f869f..bd8e083 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java @@ -30,7 +30,6 @@ import org.apache.sshd.common.FactoryManager; import org.apache.sshd.common.SshConstants; import org.apache.sshd.common.channel.AbstractChannel; import org.apache.sshd.common.channel.Channel; -import org.apache.sshd.common.channel.ChannelListener; import org.apache.sshd.common.channel.RequestHandler; import org.apache.sshd.common.channel.Window; import org.apache.sshd.common.session.Session; @@ -87,32 +86,18 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S } protected OpenFuture doInit(Buffer buffer) { - ChannelListener listener = getChannelListenerProxy(); OpenFuture f = new DefaultOpenFuture(this); + String changeEvent = "doInit"; try { - listener.channelOpenSuccess(this); + signalChannelOpenSuccess(); f.setOpened(); } catch (Throwable t) { Throwable e = GenericUtils.peelException(t); - try { - listener.channelOpenFailure(this, e); - } catch (Throwable err) { - Throwable ignored = GenericUtils.peelException(err); - log.warn("doInit({}) failed ({}) to inform listener of open failure={}: {}", - this, ignored.getClass().getSimpleName(), e.getClass().getSimpleName(), ignored.getMessage()); - if (log.isDebugEnabled()) { - log.debug("doInit(" + this + ") listener inform failure details", ignored); - } - if (log.isTraceEnabled()) { - Throwable[] suppressed = ignored.getSuppressed(); - if (GenericUtils.length(suppressed) > 0) { - for (Throwable s : suppressed) { - log.trace("doInit(" + this + ") suppressed channel open failure signalling", s); - } - } - } - } + changeEvent = e.getClass().getSimpleName(); + signalChannelOpenFailure(e); f.setException(e); + } finally { + notifyStateChanged(changeEvent); } return f; http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java index d33c47b..5ddb601 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java @@ -32,7 +32,6 @@ import org.apache.sshd.common.RuntimeSshException; import org.apache.sshd.common.SshConstants; import org.apache.sshd.common.channel.Channel; import org.apache.sshd.common.channel.ChannelFactory; -import org.apache.sshd.common.channel.ChannelListener; import org.apache.sshd.common.channel.ChannelOutputStream; import org.apache.sshd.common.channel.OpenChannelException; import org.apache.sshd.common.channel.Window; @@ -192,7 +191,6 @@ public class TcpipServerChannel extends AbstractServerChannel { } protected void handleChannelConnectResult(OpenFuture f, IoConnectFuture future) { - ChannelListener listener = getChannelListenerProxy(); try { if (future.isConnected()) { handleChannelOpenSuccess(f, future.getSession()); @@ -205,80 +203,35 @@ public class TcpipServerChannel extends AbstractServerChannel { } } catch (RuntimeException t) { Throwable e = GenericUtils.peelException(t); + signalChannelOpenFailure(e); try { - listener.channelOpenFailure(this, e); - } catch (Throwable err) { - Throwable ignored = GenericUtils.peelException(err); - log.warn("handleChannelConnectResult({})[exception] failed ({}) to inform listener of open failure={}: {}", - this, ignored.getClass().getSimpleName(), e.getClass().getSimpleName(), ignored.getMessage()); - if (log.isDebugEnabled()) { - log.debug("handleChannelConnectResult(" + this + ")[exception] listener exception details", ignored); - } - if (log.isTraceEnabled()) { - Throwable[] suppressed = ignored.getSuppressed(); - if (GenericUtils.length(suppressed) > 0) { - for (Throwable s : suppressed) { - log.trace("handleChannelConnectResult(" + this + ") suppressed channel open failure signalling", s); - } - } - } + f.setException(e); + } finally { + notifyStateChanged(e.getClass().getSimpleName()); } - f.setException(e); } } protected void handleChannelOpenSuccess(OpenFuture f, IoSession session) { ioSession = session; - ChannelListener listener = getChannelListenerProxy(); + String changeEvent = session.toString(); try { - listener.channelOpenSuccess(this); + signalChannelOpenSuccess(); f.setOpened(); } catch (Throwable t) { Throwable e = GenericUtils.peelException(t); - try { - listener.channelOpenFailure(this, e); - } catch (Throwable err) { - Throwable ignored = GenericUtils.peelException(err); - log.warn("handleChannelOpenSuccess({}) failed ({}) to inform listener of open failure={}: {}", - this, ignored.getClass().getSimpleName(), e.getClass().getSimpleName(), ignored.getMessage()); - if (log.isDebugEnabled()) { - log.debug("doInit(" + this + ") listener inform failure details", ignored); - } - if (log.isTraceEnabled()) { - Throwable[] suppressed = ignored.getSuppressed(); - if (GenericUtils.length(suppressed) > 0) { - for (Throwable s : suppressed) { - log.trace("handleChannelOpenSuccess(" + this + ") suppressed channel open failure signalling", s); - } - } - } - } + changeEvent = e.getClass().getSimpleName(); + signalChannelOpenFailure(e); f.setException(e); + } finally { + notifyStateChanged(changeEvent); } } protected void handleChannelOpenFailure(OpenFuture f, Throwable problem) { - ChannelListener listener = getChannelListenerProxy(); - try { - listener.channelOpenFailure(this, problem); - } catch (Throwable err) { - Throwable ignored = GenericUtils.peelException(err); - log.warn("handleChannelOpenFailure({}) failed ({}) to inform listener of open failure={}: {}", - this, ignored.getClass().getSimpleName(), problem.getClass().getSimpleName(), ignored.getMessage()); - if (log.isDebugEnabled()) { - log.debug("handleChannelOpenFailure(" + this + ") listener inform open failure details", ignored); - } - if (log.isTraceEnabled()) { - Throwable[] suppressed = ignored.getSuppressed(); - if (GenericUtils.length(suppressed) > 0) { - for (Throwable s : suppressed) { - log.trace("handleOpenChannelFailure(" + this + ") suppressed channel open failure signalling", s); - } - } - } - } - + signalChannelOpenFailure(problem); + notifyStateChanged(problem.getClass().getSimpleName()); closeImmediately0(); if (problem instanceof ConnectException) { http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/server/scp/ScpCommandFactory.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/server/scp/ScpCommandFactory.java b/sshd-core/src/main/java/org/apache/sshd/server/scp/ScpCommandFactory.java index d53b666..67549b5 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/scp/ScpCommandFactory.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/scp/ScpCommandFactory.java @@ -19,6 +19,7 @@ package org.apache.sshd.server.scp; import java.util.Collection; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutorService; import org.apache.sshd.common.scp.ScpFileOpener; @@ -106,8 +107,7 @@ public class ScpCommandFactory implements ScpFileOpenerHolder, CommandFactory, C private ScpFileOpener fileOpener; private int sendBufferSize = ScpHelper.MIN_SEND_BUFFER_SIZE; private int receiveBufferSize = ScpHelper.MIN_RECEIVE_BUFFER_SIZE; - private Collection<ScpTransferEventListener> listeners = - EventListenerUtils.synchronizedListenersSet(); + private Collection<ScpTransferEventListener> listeners = new CopyOnWriteArraySet<>(); private ScpTransferEventListener listenerProxy; public ScpCommandFactory() { @@ -258,7 +258,7 @@ public class ScpCommandFactory implements ScpFileOpenerHolder, CommandFactory, C try { ScpCommandFactory other = getClass().cast(super.clone()); // clone the listeners set as well - other.listeners = EventListenerUtils.synchronizedListenersSet(this.listeners); + other.listeners = new CopyOnWriteArraySet<>(this.listeners); other.listenerProxy = EventListenerUtils.proxyWrapper(ScpTransferEventListener.class, getClass().getClassLoader(), other.listeners); return other; } catch (CloneNotSupportedException e) { http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/server/session/ServerConnectionServiceFactory.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/server/session/ServerConnectionServiceFactory.java b/sshd-core/src/main/java/org/apache/sshd/server/session/ServerConnectionServiceFactory.java index 6b18e23..c2e7b9c 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/session/ServerConnectionServiceFactory.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/session/ServerConnectionServiceFactory.java @@ -41,6 +41,11 @@ public class ServerConnectionServiceFactory extends AbstractConnectionServiceFac public void removePortForwardingEventListener(PortForwardingEventListener listener) { throw new UnsupportedOperationException("removePortForwardingEventListener(" + listener + ") N/A on default instance"); } + + @Override + public PortForwardingEventListener getPortForwardingEventListenerProxy() { + return PortForwardingEventListener.EMPTY; + } }; public ServerConnectionServiceFactory() { @@ -56,7 +61,7 @@ public class ServerConnectionServiceFactory extends AbstractConnectionServiceFac public Service create(Session session) throws IOException { AbstractServerSession abstractSession = ValidateUtils.checkInstanceOf(session, AbstractServerSession.class, "Not a server session: %s", session); ServerConnectionService service = new ServerConnectionService(abstractSession); - service.addPortForwardingEventListener(getPortForwardingEventListenerProxy()); + service.addPortForwardingEventListenerManager(this); return service; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSessionImpl.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSessionImpl.java b/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSessionImpl.java index a35734e..8ff2810 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSessionImpl.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSessionImpl.java @@ -19,9 +19,7 @@ package org.apache.sshd.server.session; import org.apache.sshd.common.PropertyResolverUtils; -import org.apache.sshd.common.RuntimeSshException; import org.apache.sshd.common.io.IoSession; -import org.apache.sshd.common.session.SessionListener; import org.apache.sshd.common.util.GenericUtils; import org.apache.sshd.server.ServerFactoryManager; @@ -33,30 +31,7 @@ import org.apache.sshd.server.ServerFactoryManager; public class ServerSessionImpl extends AbstractServerSession { public ServerSessionImpl(ServerFactoryManager server, IoSession ioSession) throws Exception { super(server, ioSession); - - if (log.isDebugEnabled()) { - log.debug("Server session created {}", ioSession); - } - - // Inform the listener of the newly created session - SessionListener listener = getSessionListenerProxy(); - try { - listener.sessionCreated(this); - } catch (Throwable t) { - Throwable e = GenericUtils.peelException(t); - if (log.isDebugEnabled()) { - log.debug("Failed ({}) to announce session={} created: {}", - e.getClass().getSimpleName(), ioSession, e.getMessage()); - } - if (log.isTraceEnabled()) { - log.trace("Session=" + ioSession + " creation failure details", e); - } - if (e instanceof Exception) { - throw (Exception) e; - } else { - throw new RuntimeSshException(e); - } - } + signalSessionCreated(ioSession); String headerConfig = PropertyResolverUtils.getString(this, ServerFactoryManager.SERVER_EXTRA_IDENTIFICATION_LINES); String[] headers = GenericUtils.split(headerConfig, ServerFactoryManager.SERVER_EXTRA_IDENT_LINES_SEPARATOR); http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/AbstractSftpEventListenerManager.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/AbstractSftpEventListenerManager.java b/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/AbstractSftpEventListenerManager.java index 15de46a..9289458 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/AbstractSftpEventListenerManager.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/AbstractSftpEventListenerManager.java @@ -20,7 +20,7 @@ package org.apache.sshd.server.subsystem.sftp; import java.util.Collection; -import java.util.Objects; +import java.util.concurrent.CopyOnWriteArraySet; import org.apache.sshd.common.util.EventListenerUtils; @@ -28,8 +28,7 @@ import org.apache.sshd.common.util.EventListenerUtils; * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> */ public abstract class AbstractSftpEventListenerManager implements SftpEventListenerManager { - private final Collection<SftpEventListener> sftpEventListeners = - EventListenerUtils.synchronizedListenersSet(); + private final Collection<SftpEventListener> sftpEventListeners = new CopyOnWriteArraySet<>(); private final SftpEventListener sftpEventListenerProxy; protected AbstractSftpEventListenerManager() { @@ -48,11 +47,15 @@ public abstract class AbstractSftpEventListenerManager implements SftpEventListe @Override public boolean addSftpEventListener(SftpEventListener listener) { - return sftpEventListeners.add(Objects.requireNonNull(listener, "No listener")); + return sftpEventListeners.add(SftpEventListener.validateListener(listener)); } @Override public boolean removeSftpEventListener(SftpEventListener listener) { - return sftpEventListeners.remove(listener); + if (listener == null) { + return false; + } + + return sftpEventListeners.remove(SftpEventListener.validateListener(listener)); } } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpEventListener.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpEventListener.java b/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpEventListener.java index f5736c9..6af7199 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpEventListener.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpEventListener.java @@ -376,4 +376,8 @@ public interface SftpEventListener extends SshdEventListener { throws IOException { // ignored } + + static <L extends SftpEventListener> L validateListener(L listener) { + return SshdEventListener.validateListener(listener, SftpEventListener.class.getSimpleName()); + } } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java b/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java index 938e13e..8175520 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java @@ -69,6 +69,7 @@ import java.util.Objects; import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; @@ -278,8 +279,7 @@ public class SftpSubsystem private ServerSession serverSession; private final AtomicBoolean closed = new AtomicBoolean(false); - private final Collection<SftpEventListener> sftpEventListeners = - EventListenerUtils.synchronizedListenersSet(); + private final Collection<SftpEventListener> sftpEventListeners = new CopyOnWriteArraySet<>(); private final SftpEventListener sftpEventListenerProxy; private final SftpFileSystemAccessor fileSystemAccessor; @@ -328,12 +328,16 @@ public class SftpSubsystem @Override public boolean addSftpEventListener(SftpEventListener listener) { - return sftpEventListeners.add(Objects.requireNonNull(listener, "No listener")); + return sftpEventListeners.add(SftpEventListener.validateListener(listener)); } @Override public boolean removeSftpEventListener(SftpEventListener listener) { - return sftpEventListeners.remove(listener); + if (listener == null) { + return false; + } + + return sftpEventListeners.remove(SftpEventListener.validateListener(listener)); } @Override http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/38c84a83/sshd-core/src/test/java/org/apache/sshd/ProxyTest.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/test/java/org/apache/sshd/ProxyTest.java b/sshd-core/src/test/java/org/apache/sshd/ProxyTest.java index 1862472..d5ced50 100644 --- a/sshd-core/src/test/java/org/apache/sshd/ProxyTest.java +++ b/sshd-core/src/test/java/org/apache/sshd/ProxyTest.java @@ -62,6 +62,7 @@ public class ProxyTest extends BaseTestSupport { private int echoPort; private IoAcceptor acceptor; private SshClient client; + @SuppressWarnings("checkstyle:anoninnerlength") private final PortForwardingEventListener serverSideListener = new PortForwardingEventListener() { private final Logger log = LoggerFactory.getLogger(ProxyTest.class); @@ -118,6 +119,10 @@ public class ProxyTest extends BaseTestSupport { } }; + public ProxyTest() { + super(); + } + @Before public void setUp() throws Exception { sshd = setupTestServer(); @@ -222,6 +227,11 @@ public class ProxyTest extends BaseTestSupport { assertSame("Establishment indication not invoked", local, localAddressHolder.get()); assertNull("Multiple calls to establishment indicator", boundAddressHolder.getAndSet(boundAddress)); } + + @Override + public String toString() { + return getCurrentTestName(); + } }; try (ClientSession session = createNativeSession(listener)) {