[SSHD-766] Separate forwarding filter functionality according to sshd-config options
Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/aa551bc0 Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/aa551bc0 Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/aa551bc0 Branch: refs/heads/master Commit: aa551bc0ed07430ee768e98a57b75cd56f3927e0 Parents: 306bef2 Author: Goldstein Lyor <l...@c-b4.com> Authored: Mon Aug 28 15:12:26 2017 +0300 Committer: Lyor Goldstein <lyor.goldst...@gmail.com> Committed: Mon Aug 28 19:34:56 2017 +0300 ---------------------------------------------------------------------- .../java/org/apache/sshd/client/SshClient.java | 2 +- .../client/session/AbstractClientSession.java | 24 +- .../org/apache/sshd/common/BaseBuilder.java | 20 +- .../org/apache/sshd/common/FactoryManager.java | 25 +- .../common/config/AllowTcpForwardingValue.java | 63 -- .../sshd/common/config/SshConfigFileReader.java | 13 - .../common/forward/DefaultForwarderFactory.java | 82 ++ .../common/forward/DefaultForwardingFilter.java | 1003 +++++++++++++++++ .../common/forward/DefaultTcpipForwarder.java | 1007 ------------------ .../forward/DefaultTcpipForwarderFactory.java | 82 -- .../sshd/common/forward/ForwardingFilter.java | 59 + .../common/forward/ForwardingFilterFactory.java | 37 + .../sshd/common/forward/TcpipForwarder.java | 54 - .../common/forward/TcpipForwarderFactory.java | 37 - .../common/helpers/AbstractFactoryManager.java | 22 +- .../sshd/common/session/ConnectionService.java | 8 +- .../helpers/AbstractConnectionService.java | 32 +- .../java/org/apache/sshd/server/SshServer.java | 56 +- .../sshd/server/channel/ChannelSession.java | 7 +- .../server/config/AllowTcpForwardingValue.java | 106 ++ .../config/SshServerConfigFileReader.java | 111 ++ .../server/forward/AgentForwardingFilter.java | 51 + .../sshd/server/forward/ForwardingFilter.java | 162 +-- .../server/forward/TcpForwardingFilter.java | 162 +++ .../sshd/server/forward/TcpipServerChannel.java | 4 +- .../server/forward/X11ForwardingFilter.java | 51 + .../global/CancelTcpipForwardHandler.java | 4 +- .../sshd/server/global/TcpipForwardHandler.java | 4 +- .../test/java/org/apache/sshd/ProxyTest.java | 4 +- .../java/org/apache/sshd/agent/AgentTest.java | 4 +- .../client/ClientAuthenticationManagerTest.java | 4 +- .../forward/ApacheServerApacheClientTest.java | 4 +- .../forward/ApacheServerJSchClientTest.java | 2 +- .../common/forward/PortForwardingLoadTest.java | 2 +- .../sshd/common/forward/PortForwardingTest.java | 18 +- .../server/subsystem/sftp/SshFsMounter.java | 2 +- 36 files changed, 1808 insertions(+), 1520 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/aa551bc0/sshd-core/src/main/java/org/apache/sshd/client/SshClient.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/client/SshClient.java b/sshd-core/src/main/java/org/apache/sshd/client/SshClient.java index c8cc1db..e4d37ab 100644 --- a/sshd-core/src/main/java/org/apache/sshd/client/SshClient.java +++ b/sshd-core/src/main/java/org/apache/sshd/client/SshClient.java @@ -378,7 +378,7 @@ public class SshClient extends AbstractFactoryManager implements ClientFactoryMa protected void checkConfig() { super.checkConfig(); - Objects.requireNonNull(getTcpipForwarderFactory(), "TcpipForwarderFactory not set"); + Objects.requireNonNull(getForwarderFactory(), "ForwarderFactory not set"); Objects.requireNonNull(getServerKeyVerifier(), "ServerKeyVerifier not set"); Objects.requireNonNull(getHostConfigEntryResolver(), "HostConfigEntryResolver not set"); Objects.requireNonNull(getClientIdentityLoader(), "ClientIdentityLoader not set"); http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/aa551bc0/sshd-core/src/main/java/org/apache/sshd/client/session/AbstractClientSession.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/client/session/AbstractClientSession.java b/sshd-core/src/main/java/org/apache/sshd/client/session/AbstractClientSession.java index 9e85843..4e9e6c9 100644 --- a/sshd-core/src/main/java/org/apache/sshd/client/session/AbstractClientSession.java +++ b/sshd-core/src/main/java/org/apache/sshd/client/session/AbstractClientSession.java @@ -58,7 +58,7 @@ import org.apache.sshd.common.channel.Channel; import org.apache.sshd.common.cipher.BuiltinCiphers; import org.apache.sshd.common.cipher.CipherNone; import org.apache.sshd.common.config.keys.KeyUtils; -import org.apache.sshd.common.forward.TcpipForwarder; +import org.apache.sshd.common.forward.ForwardingFilter; import org.apache.sshd.common.future.DefaultKeyExchangeFuture; import org.apache.sshd.common.future.KeyExchangeFuture; import org.apache.sshd.common.io.IoSession; @@ -388,37 +388,43 @@ public abstract class AbstractClientSession extends AbstractSession implements C @Override public SshdSocketAddress startLocalPortForwarding(SshdSocketAddress local, SshdSocketAddress remote) throws IOException { - return getTcpipForwarder().startLocalPortForwarding(local, remote); + ForwardingFilter filter = getForwardingFilter(); + return filter.startLocalPortForwarding(local, remote); } @Override public void stopLocalPortForwarding(SshdSocketAddress local) throws IOException { - getTcpipForwarder().stopLocalPortForwarding(local); + ForwardingFilter filter = getForwardingFilter(); + filter.stopLocalPortForwarding(local); } @Override public SshdSocketAddress startRemotePortForwarding(SshdSocketAddress remote, SshdSocketAddress local) throws IOException { - return getTcpipForwarder().startRemotePortForwarding(remote, local); + ForwardingFilter filter = getForwardingFilter(); + return filter.startRemotePortForwarding(remote, local); } @Override public void stopRemotePortForwarding(SshdSocketAddress remote) throws IOException { - getTcpipForwarder().stopRemotePortForwarding(remote); + ForwardingFilter filter = getForwardingFilter(); + filter.stopRemotePortForwarding(remote); } @Override public SshdSocketAddress startDynamicPortForwarding(SshdSocketAddress local) throws IOException { - return getTcpipForwarder().startDynamicPortForwarding(local); + ForwardingFilter filter = getForwardingFilter(); + return filter.startDynamicPortForwarding(local); } @Override public void stopDynamicPortForwarding(SshdSocketAddress local) throws IOException { - getTcpipForwarder().stopDynamicPortForwarding(local); + ForwardingFilter filter = getForwardingFilter(); + filter.stopDynamicPortForwarding(local); } - protected TcpipForwarder getTcpipForwarder() { + protected ForwardingFilter getForwardingFilter() { ConnectionService service = Objects.requireNonNull(getConnectionService(), "No connection service"); - return Objects.requireNonNull(service.getTcpipForwarder(), "No forwarder"); + return Objects.requireNonNull(service.getForwardingFilter(), "No forwarder"); } @Override http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/aa551bc0/sshd-core/src/main/java/org/apache/sshd/common/BaseBuilder.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/BaseBuilder.java b/sshd-core/src/main/java/org/apache/sshd/common/BaseBuilder.java index 534ac0e..250dc63 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/BaseBuilder.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/BaseBuilder.java @@ -30,8 +30,8 @@ import org.apache.sshd.common.cipher.Cipher; import org.apache.sshd.common.compression.Compression; import org.apache.sshd.common.file.FileSystemFactory; import org.apache.sshd.common.file.nativefs.NativeFileSystemFactory; -import org.apache.sshd.common.forward.DefaultTcpipForwarderFactory; -import org.apache.sshd.common.forward.TcpipForwarderFactory; +import org.apache.sshd.common.forward.DefaultForwarderFactory; +import org.apache.sshd.common.forward.ForwardingFilterFactory; import org.apache.sshd.common.helpers.AbstractFactoryManager; import org.apache.sshd.common.kex.BuiltinDHFactories; import org.apache.sshd.common.kex.KeyExchange; @@ -59,7 +59,7 @@ public class BaseBuilder<T extends AbstractFactoryManager, S extends BaseBuilder public static final ForwardingFilter DEFAULT_FORWARDING_FILTER = RejectAllForwardingFilter.INSTANCE; - public static final TcpipForwarderFactory DEFAULT_FORWARDER_FACTORY = DefaultTcpipForwarderFactory.INSTANCE; + public static final ForwardingFilterFactory DEFAULT_FORWARDER_FACTORY = DefaultForwarderFactory.INSTANCE; /** * The default {@link BuiltinCiphers} setup in order of preference @@ -138,7 +138,7 @@ public class BaseBuilder<T extends AbstractFactoryManager, S extends BaseBuilder protected Factory<Random> randomFactory; protected List<NamedFactory<Channel>> channelFactories; protected FileSystemFactory fileSystemFactory; - protected TcpipForwarderFactory tcpipForwarderFactory; + protected ForwardingFilterFactory forwarderFactory; protected List<RequestHandler<ConnectionService>> globalRequestHandlers; protected ForwardingFilter forwardingFilter; @@ -171,8 +171,8 @@ public class BaseBuilder<T extends AbstractFactoryManager, S extends BaseBuilder forwardingFilter = DEFAULT_FORWARDING_FILTER; } - if (tcpipForwarderFactory == null) { - tcpipForwarderFactory = DEFAULT_FORWARDER_FACTORY; + if (forwarderFactory == null) { + forwarderFactory = DEFAULT_FORWARDER_FACTORY; } return me(); @@ -223,8 +223,8 @@ public class BaseBuilder<T extends AbstractFactoryManager, S extends BaseBuilder return me(); } - public S tcpipForwarderFactory(TcpipForwarderFactory tcpipForwarderFactory) { - this.tcpipForwarderFactory = tcpipForwarderFactory; + public S forwarderFactory(ForwardingFilterFactory forwarderFactory) { + this.forwarderFactory = forwarderFactory; return me(); } @@ -253,8 +253,8 @@ public class BaseBuilder<T extends AbstractFactoryManager, S extends BaseBuilder ssh.setMacFactories(macFactories); ssh.setChannelFactories(channelFactories); ssh.setFileSystemFactory(fileSystemFactory); - ssh.setTcpipForwardingFilter(forwardingFilter); - ssh.setTcpipForwarderFactory(tcpipForwarderFactory); + ssh.setForwardingFilter(forwardingFilter); + ssh.setForwarderFactory(forwarderFactory); ssh.setGlobalRequestHandlers(globalRequestHandlers); return ssh; } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/aa551bc0/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java index 8652cd8..71e06da 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java @@ -27,15 +27,18 @@ import org.apache.sshd.common.channel.Channel; import org.apache.sshd.common.channel.ChannelListenerManager; import org.apache.sshd.common.channel.RequestHandler; import org.apache.sshd.common.file.FileSystemFactory; +import org.apache.sshd.common.forward.ForwardingFilterFactory; import org.apache.sshd.common.forward.PortForwardingEventListenerManager; -import org.apache.sshd.common.forward.TcpipForwarderFactory; import org.apache.sshd.common.io.IoServiceFactory; import org.apache.sshd.common.kex.KexFactoryManager; import org.apache.sshd.common.random.Random; import org.apache.sshd.common.session.ConnectionService; import org.apache.sshd.common.session.ReservedSessionMessagesManager; import org.apache.sshd.common.session.SessionListenerManager; +import org.apache.sshd.server.forward.AgentForwardingFilter; import org.apache.sshd.server.forward.ForwardingFilter; +import org.apache.sshd.server.forward.TcpForwardingFilter; +import org.apache.sshd.server.forward.X11ForwardingFilter; /** * This interface allows retrieving all the <code>NamedFactory</code> used @@ -415,14 +418,26 @@ public interface FactoryManager * * @return The {@link ForwardingFilter} or {@code null} */ - ForwardingFilter getTcpipForwardingFilter(); + ForwardingFilter getForwardingFilter(); + + default TcpForwardingFilter getTcpForwardingFilter() { + return getForwardingFilter(); + } + + default AgentForwardingFilter getAgentForwardingFilter() { + return getForwardingFilter(); + } + + default X11ForwardingFilter getX11ForwardingFilter() { + return getForwardingFilter(); + } /** - * Retrieve the tcpip forwarder factory used to support tcpip forwarding. + * Retrieve the forwarder factory used to support forwarding. * - * @return The {@link TcpipForwarderFactory} + * @return The {@link ForwardingFilterFactory} */ - TcpipForwarderFactory getTcpipForwarderFactory(); + ForwardingFilterFactory getForwarderFactory(); /** * Retrieve the <code>FileSystemFactory</code> to be used to traverse the file system. http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/aa551bc0/sshd-core/src/main/java/org/apache/sshd/common/config/AllowTcpForwardingValue.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/config/AllowTcpForwardingValue.java b/sshd-core/src/main/java/org/apache/sshd/common/config/AllowTcpForwardingValue.java deleted file mode 100644 index c641f33..0000000 --- a/sshd-core/src/main/java/org/apache/sshd/common/config/AllowTcpForwardingValue.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sshd.common.config; - -import java.util.Collections; -import java.util.EnumSet; -import java.util.Set; - -import org.apache.sshd.common.util.GenericUtils; - -/** - * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> - * @see <A HREF="http://www.freebsd.org/cgi/man.cgi?query=sshd_config&sektion=5">sshd_config(5) section</A> - */ -public enum AllowTcpForwardingValue { - ALL, - NONE, - LOCAL, - REMOTE; - - public static final Set<AllowTcpForwardingValue> VALUES = - Collections.unmodifiableSet(EnumSet.allOf(AllowTcpForwardingValue.class)); - - // NOTE: it also interprets "yes" as "all" and "no" as "none" - public static AllowTcpForwardingValue fromString(String s) { - if (GenericUtils.isEmpty(s)) { - return null; - } - - if ("yes".equalsIgnoreCase(s)) { - return ALL; - } - - if ("no".equalsIgnoreCase(s)) { - return NONE; - } - - for (AllowTcpForwardingValue v : VALUES) { - if (s.equalsIgnoreCase(v.name())) { - return v; - } - } - - return null; - } -} http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/aa551bc0/sshd-core/src/main/java/org/apache/sshd/common/config/SshConfigFileReader.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/config/SshConfigFileReader.java b/sshd-core/src/main/java/org/apache/sshd/common/config/SshConfigFileReader.java index 7f6b5e8..b6d87d4 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/config/SshConfigFileReader.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/config/SshConfigFileReader.java @@ -78,21 +78,8 @@ public final class SshConfigFileReader { public static final char COMMENT_CHAR = '#'; - // Some well known configuration properties names and values - public static final String BANNER_CONFIG_PROP = "Banner"; - public static final String VISUAL_HOST_KEY = "VisualHostKey"; - public static final String DEFAULT_VISUAL_HOST_KEY = "no"; public static final String COMPRESSION_PROP = "Compression"; public static final String DEFAULT_COMPRESSION = CompressionConfigValue.NO.getName(); - public static final String ALLOW_TCP_FORWARDING_CONFIG_PROP = "AllowTcpForwarding"; - public static final String DEFAULT_TCP_FORWARDING = "yes"; - public static final boolean DEFAULT_TCP_FORWARDING_VALUE = parseBooleanValue(DEFAULT_TCP_FORWARDING); - public static final String ALLOW_AGENT_FORWARDING_CONFIG_PROP = "AllowAgentForwarding"; - public static final String DEFAULT_AGENT_FORWARDING = "yes"; - public static final boolean DEFAULT_AGENT_FORWARDING_VALUE = parseBooleanValue(DEFAULT_AGENT_FORWARDING); - public static final String ALLOW_X11_FORWARDING_CONFIG_PROP = "X11Forwarding"; - public static final String DEFAULT_X11_FORWARDING = "yes"; - public static final boolean DEFAULT_X11_FORWARDING_VALUE = parseBooleanValue(DEFAULT_X11_FORWARDING); public static final String MAX_SESSIONS_CONFIG_PROP = "MaxSessions"; public static final int DEFAULT_MAX_SESSIONS = 10; public static final String PASSWORD_AUTH_CONFIG_PROP = "PasswordAuthentication"; http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/aa551bc0/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwarderFactory.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwarderFactory.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwarderFactory.java new file mode 100644 index 0000000..ace0562 --- /dev/null +++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwarderFactory.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sshd.common.forward; + +import java.util.Collection; +import java.util.concurrent.CopyOnWriteArraySet; + +import org.apache.sshd.common.session.ConnectionService; +import org.apache.sshd.common.util.EventListenerUtils; + +/** + * The default {@link ForwardingFilterFactory} implementation. + * + * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> + */ +public class DefaultForwarderFactory implements ForwardingFilterFactory, PortForwardingEventListenerManager { + public static final DefaultForwarderFactory INSTANCE = new DefaultForwarderFactory() { + @Override + public void addPortForwardingEventListener(PortForwardingEventListener listener) { + throw new UnsupportedOperationException("addPortForwardingListener(" + listener + ") N/A on default instance"); + } + + @Override + public void removePortForwardingEventListener(PortForwardingEventListener listener) { + throw new UnsupportedOperationException("removePortForwardingEventListener(" + listener + ") N/A on default instance"); + } + + @Override + public PortForwardingEventListener getPortForwardingEventListenerProxy() { + return PortForwardingEventListener.EMPTY; + } + }; + + private final Collection<PortForwardingEventListener> listeners = new CopyOnWriteArraySet<>(); + private final PortForwardingEventListener listenerProxy; + + public DefaultForwarderFactory() { + listenerProxy = EventListenerUtils.proxyWrapper(PortForwardingEventListener.class, getClass().getClassLoader(), listeners); + } + + @Override + public PortForwardingEventListener getPortForwardingEventListenerProxy() { + return listenerProxy; + } + + @Override + public void addPortForwardingEventListener(PortForwardingEventListener listener) { + listeners.add(PortForwardingEventListener.validateListener(listener)); + } + + @Override + public void removePortForwardingEventListener(PortForwardingEventListener listener) { + if (listener == null) { + return; + } + + listeners.remove(PortForwardingEventListener.validateListener(listener)); + } + + @Override + public ForwardingFilter create(ConnectionService service) { + ForwardingFilter forwarder = new DefaultForwardingFilter(service); + forwarder.addPortForwardingEventListenerManager(this); + return forwarder; + } +} http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/aa551bc0/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwardingFilter.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwardingFilter.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwardingFilter.java new file mode 100644 index 0000000..64f37db --- /dev/null +++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwardingFilter.java @@ -0,0 +1,1003 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sshd.common.forward; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.TimeUnit; + +import org.apache.sshd.client.channel.ClientChannelEvent; +import org.apache.sshd.common.Closeable; +import org.apache.sshd.common.Factory; +import org.apache.sshd.common.FactoryManager; +import org.apache.sshd.common.RuntimeSshException; +import org.apache.sshd.common.SshConstants; +import org.apache.sshd.common.SshException; +import org.apache.sshd.common.io.IoAcceptor; +import org.apache.sshd.common.io.IoHandler; +import org.apache.sshd.common.io.IoHandlerFactory; +import org.apache.sshd.common.io.IoServiceFactory; +import org.apache.sshd.common.io.IoSession; +import org.apache.sshd.common.session.ConnectionService; +import org.apache.sshd.common.session.Session; +import org.apache.sshd.common.session.SessionHolder; +import org.apache.sshd.common.util.EventListenerUtils; +import org.apache.sshd.common.util.GenericUtils; +import org.apache.sshd.common.util.Invoker; +import org.apache.sshd.common.util.Readable; +import org.apache.sshd.common.util.ValidateUtils; +import org.apache.sshd.common.util.buffer.Buffer; +import org.apache.sshd.common.util.buffer.ByteArrayBuffer; +import org.apache.sshd.common.util.closeable.AbstractInnerCloseable; +import org.apache.sshd.common.util.net.SshdSocketAddress; +import org.apache.sshd.server.forward.TcpForwardingFilter; + +/** + * Requests a "tcpip-forward" action + * + * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> + */ +public class DefaultForwardingFilter + extends AbstractInnerCloseable + implements ForwardingFilter, SessionHolder<Session>, PortForwardingEventListenerManager { + + /** + * Used to configure the timeout (milliseconds) for receiving a response + * for the forwarding request + * + * @see #DEFAULT_FORWARD_REQUEST_TIMEOUT + */ + public static final String FORWARD_REQUEST_TIMEOUT = "tcpip-forward-request-timeout"; + + /** + * Default value for {@link #FORWARD_REQUEST_TIMEOUT} if none specified + */ + public static final long DEFAULT_FORWARD_REQUEST_TIMEOUT = TimeUnit.SECONDS.toMillis(15L); + + public static final Set<ClientChannelEvent> STATIC_IO_MSG_RECEIVED_EVENTS = + Collections.unmodifiableSet(EnumSet.of(ClientChannelEvent.OPENED, ClientChannelEvent.CLOSED)); + + private final ConnectionService service; + private final IoHandlerFactory socksProxyIoHandlerFactory = () -> new SocksProxy(getConnectionService()); + private final Session sessionInstance; + private final Map<Integer, SshdSocketAddress> localToRemote = new TreeMap<>(Comparator.naturalOrder()); + private final Map<Integer, SshdSocketAddress> remoteToLocal = new TreeMap<>(Comparator.naturalOrder()); + private final Map<Integer, SocksProxy> dynamicLocal = new TreeMap<>(Comparator.naturalOrder()); + private final Set<LocalForwardingEntry> localForwards = new HashSet<>(); + private final IoHandlerFactory staticIoHandlerFactory = StaticIoHandler::new; + private final Collection<PortForwardingEventListener> listeners = new CopyOnWriteArraySet<>(); + private final Collection<PortForwardingEventListenerManager> managersHolder = new CopyOnWriteArraySet<>(); + private final PortForwardingEventListener listenerProxy; + + private IoAcceptor acceptor; + + public DefaultForwardingFilter(ConnectionService service) { + this.service = Objects.requireNonNull(service, "No connection service"); + this.sessionInstance = Objects.requireNonNull(service.getSession(), "No session"); + this.listenerProxy = EventListenerUtils.proxyWrapper(PortForwardingEventListener.class, getClass().getClassLoader(), listeners); + } + + @Override + public PortForwardingEventListener getPortForwardingEventListenerProxy() { + return listenerProxy; + } + + @Override + public void addPortForwardingEventListener(PortForwardingEventListener listener) { + listeners.add(PortForwardingEventListener.validateListener(listener)); + } + + @Override + public void removePortForwardingEventListener(PortForwardingEventListener listener) { + if (listener == null) { + return; + } + + listeners.remove(PortForwardingEventListener.validateListener(listener)); + } + + @Override + public Collection<PortForwardingEventListenerManager> getRegisteredManagers() { + return managersHolder.isEmpty() ? Collections.emptyList() : new ArrayList<>(managersHolder); + } + + @Override + public boolean addPortForwardingEventListenerManager(PortForwardingEventListenerManager manager) { + return managersHolder.add(Objects.requireNonNull(manager, "No manager")); + } + + @Override + public boolean removePortForwardingEventListenerManager(PortForwardingEventListenerManager manager) { + if (manager == null) { + return false; + } + + return managersHolder.remove(manager); + } + + @Override + public Session getSession() { + return sessionInstance; + } + + public final ConnectionService getConnectionService() { + return service; + } + + protected Collection<PortForwardingEventListener> getDefaultListeners() { + Collection<PortForwardingEventListener> defaultListeners = new ArrayList<>(); + defaultListeners.add(getPortForwardingEventListenerProxy()); + + Session session = getSession(); + PortForwardingEventListener l = session.getPortForwardingEventListenerProxy(); + if (l != null) { + defaultListeners.add(l); + } + + FactoryManager manager = (session == null) ? null : session.getFactoryManager(); + l = (manager == null) ? null : manager.getPortForwardingEventListenerProxy(); + if (l != null) { + defaultListeners.add(l); + } + + return defaultListeners; + } + + @Override + public synchronized SshdSocketAddress startLocalPortForwarding(SshdSocketAddress local, SshdSocketAddress remote) throws IOException { + Objects.requireNonNull(local, "Local address is null"); + ValidateUtils.checkTrue(local.getPort() >= 0, "Invalid local port: %s", local); + Objects.requireNonNull(remote, "Remote address is null"); + + if (isClosed()) { + throw new IllegalStateException("TcpipForwarder is closed"); + } + if (isClosing()) { + throw new IllegalStateException("TcpipForwarder is closing"); + } + + InetSocketAddress bound; + int port; + signalEstablishingExplicitTunnel(local, remote, true); + try { + bound = doBind(local, staticIoHandlerFactory); + port = bound.getPort(); + SshdSocketAddress prev; + synchronized (localToRemote) { + prev = localToRemote.put(port, remote); + } + + if (prev != null) { + throw new IOException("Multiple local port forwarding bindings on port=" + port + ": current=" + remote + ", previous=" + prev); + } + } catch (IOException | RuntimeException e) { + try { + stopLocalPortForwarding(local); + } catch (IOException | RuntimeException err) { + e.addSuppressed(err); + } + signalEstablishedExplicitTunnel(local, remote, true, null, e); + throw e; + } + + try { + SshdSocketAddress result = new SshdSocketAddress(bound.getHostString(), port); + if (log.isDebugEnabled()) { + log.debug("startLocalPortForwarding(" + local + " -> " + remote + "): " + result); + } + signalEstablishedExplicitTunnel(local, remote, true, result, null); + return result; + } catch (IOException | RuntimeException e) { + stopLocalPortForwarding(local); + throw e; + } + } + + @Override + public synchronized void stopLocalPortForwarding(SshdSocketAddress local) throws IOException { + Objects.requireNonNull(local, "Local address is null"); + + SshdSocketAddress bound; + synchronized (localToRemote) { + bound = localToRemote.remove(local.getPort()); + } + + if ((bound != null) && (acceptor != null)) { + if (log.isDebugEnabled()) { + log.debug("stopLocalPortForwarding(" + local + ") unbind " + bound); + } + + signalTearingDownExplicitTunnel(bound, true); + try { + acceptor.unbind(bound.toInetSocketAddress()); + } catch (RuntimeException e) { + signalTornDownExplicitTunnel(bound, true, e); + throw e; + } + + signalTornDownExplicitTunnel(bound, true, null); + } else { + if (log.isDebugEnabled()) { + log.debug("stopLocalPortForwarding(" + local + ") no mapping/acceptor for " + bound); + } + } + } + + @Override + public synchronized SshdSocketAddress startRemotePortForwarding(SshdSocketAddress remote, SshdSocketAddress local) throws IOException { + Objects.requireNonNull(local, "Local address is null"); + Objects.requireNonNull(remote, "Remote address is null"); + + String remoteHost = remote.getHostName(); + int remotePort = remote.getPort(); + Session session = getSession(); + Buffer buffer = session.createBuffer(SshConstants.SSH_MSG_GLOBAL_REQUEST, remoteHost.length() + Long.SIZE); + buffer.putString("tcpip-forward"); + buffer.putBoolean(true); // want reply + buffer.putString(remoteHost); + buffer.putInt(remotePort); + + long timeout = session.getLongProperty(FORWARD_REQUEST_TIMEOUT, DEFAULT_FORWARD_REQUEST_TIMEOUT); + Buffer result; + int port; + signalEstablishingExplicitTunnel(local, remote, false); + try { + result = session.request("tcpip-forward", buffer, timeout, TimeUnit.MILLISECONDS); + if (result == null) { + throw new SshException("Tcpip forwarding request denied by server"); + } + port = (remotePort == 0) ? result.getInt() : remote.getPort(); + // TODO: Is it really safe to only store the local address after the request ? + SshdSocketAddress prev; + synchronized (remoteToLocal) { + prev = remoteToLocal.put(port, local); + } + + if (prev != null) { + throw new IOException("Multiple remote port forwarding bindings on port=" + port + ": current=" + remote + ", previous=" + prev); + } + } catch (IOException | RuntimeException e) { + try { + stopRemotePortForwarding(remote); + } catch (IOException | RuntimeException err) { + e.addSuppressed(err); + } + signalEstablishedExplicitTunnel(local, remote, false, null, e); + throw e; + } + + try { + SshdSocketAddress bound = new SshdSocketAddress(remoteHost, port); + if (log.isDebugEnabled()) { + log.debug("startRemotePortForwarding(" + remote + " -> " + local + "): " + bound); + } + + signalEstablishedExplicitTunnel(local, remote, false, bound, null); + return bound; + } catch (IOException | RuntimeException e) { + stopRemotePortForwarding(remote); + throw e; + } + } + + @Override + public synchronized void stopRemotePortForwarding(SshdSocketAddress remote) throws IOException { + SshdSocketAddress bound; + synchronized (remoteToLocal) { + bound = remoteToLocal.remove(remote.getPort()); + } + + if (bound != null) { + if (log.isDebugEnabled()) { + log.debug("stopRemotePortForwarding(" + remote + ") cancel forwarding to " + bound); + } + + String remoteHost = remote.getHostName(); + Session session = getSession(); + Buffer buffer = session.createBuffer(SshConstants.SSH_MSG_GLOBAL_REQUEST, remoteHost.length() + Long.SIZE); + buffer.putString("cancel-tcpip-forward"); + buffer.putBoolean(false); // want reply + buffer.putString(remoteHost); + buffer.putInt(remote.getPort()); + + signalTearingDownExplicitTunnel(bound, false); + try { + session.writePacket(buffer); + } catch (IOException | RuntimeException e) { + signalTornDownExplicitTunnel(bound, false, e); + throw e; + } + + signalTornDownExplicitTunnel(bound, false, null); + } else { + if (log.isDebugEnabled()) { + log.debug("stopRemotePortForwarding(" + remote + ") no binding found"); + } + } + } + + protected void signalTearingDownExplicitTunnel(SshdSocketAddress boundAddress, boolean localForwarding) throws IOException { + try { + invokePortEventListenerSignaller(l -> { + signalTearingDownExplicitTunnel(l, boundAddress, localForwarding); + return null; + }); + } catch (Throwable t) { + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } else if (t instanceof Error) { + throw (Error) t; + } else if (t instanceof IOException) { + throw (IOException) t; + } else { + throw new IOException("Failed (" + t.getClass().getSimpleName() + ")" + + " to signal tearing down explicit tunnel for local=" + localForwarding + + " on bound=" + boundAddress, t); + } + } + } + + protected void signalTearingDownExplicitTunnel( + PortForwardingEventListener listener, SshdSocketAddress boundAddress, boolean localForwarding) + throws IOException { + if (listener == null) { + return; + } + + listener.tearingDownExplicitTunnel(getSession(), boundAddress, localForwarding); + } + + protected void signalTornDownExplicitTunnel(SshdSocketAddress boundAddress, boolean localForwarding, Throwable reason) throws IOException { + try { + invokePortEventListenerSignaller(l -> { + signalTornDownExplicitTunnel(l, boundAddress, localForwarding, reason); + return null; + }); + } catch (Throwable t) { + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } else if (t instanceof Error) { + throw (Error) t; + } else if (t instanceof IOException) { + throw (IOException) t; + } else { + throw new IOException("Failed (" + t.getClass().getSimpleName() + ")" + + " to signal torn down explicit tunnel local=" + localForwarding + + " on bound=" + boundAddress, t); + } + } + } + + protected void signalTornDownExplicitTunnel( + PortForwardingEventListener listener, SshdSocketAddress boundAddress, boolean localForwarding, Throwable reason) + throws IOException { + if (listener == null) { + return; + } + + listener.tornDownExplicitTunnel(getSession(), boundAddress, localForwarding, reason); + } + + @Override + public synchronized SshdSocketAddress startDynamicPortForwarding(SshdSocketAddress local) throws IOException { + Objects.requireNonNull(local, "Local address is null"); + ValidateUtils.checkTrue(local.getPort() >= 0, "Invalid local port: %s", local); + + if (isClosed()) { + throw new IllegalStateException("TcpipForwarder is closed"); + } + if (isClosing()) { + throw new IllegalStateException("TcpipForwarder is closing"); + } + + SocksProxy socksProxy = new SocksProxy(service); + SocksProxy prev; + InetSocketAddress bound; + int port; + signalEstablishingDynamicTunnel(local); + try { + bound = doBind(local, socksProxyIoHandlerFactory); + port = bound.getPort(); + synchronized (dynamicLocal) { + prev = dynamicLocal.put(port, socksProxy); + } + + if (prev != null) { + throw new IOException("Multiple dynamic port mappings found for port=" + port + ": current=" + socksProxy + ", previous=" + prev); + } + } catch (IOException | RuntimeException e) { + try { + stopDynamicPortForwarding(local); + } catch (IOException | RuntimeException err) { + e.addSuppressed(err); + } + signalEstablishedDynamicTunnel(local, null, e); + throw e; + } + + try { + SshdSocketAddress result = new SshdSocketAddress(bound.getHostString(), port); + if (log.isDebugEnabled()) { + log.debug("startDynamicPortForwarding(" + local + "): " + result); + } + + signalEstablishedDynamicTunnel(local, result, null); + return result; + } catch (IOException | RuntimeException e) { + stopDynamicPortForwarding(local); + throw e; + } + } + + protected void signalEstablishedDynamicTunnel( + SshdSocketAddress local, SshdSocketAddress boundAddress, Throwable reason) + throws IOException { + try { + invokePortEventListenerSignaller(l -> { + signalEstablishedDynamicTunnel(l, local, boundAddress, reason); + return null; + }); + } catch (Throwable t) { + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } else if (t instanceof Error) { + throw (Error) t; + } else if (t instanceof IOException) { + throw (IOException) t; + } else { + throw new IOException("Failed (" + t.getClass().getSimpleName() + ")" + + " to signal establishing dynamic tunnel for local=" + local + + " on bound=" + boundAddress, t); + } + } + } + + protected void signalEstablishedDynamicTunnel(PortForwardingEventListener listener, + SshdSocketAddress local, SshdSocketAddress boundAddress, Throwable reason) + throws IOException { + if (listener == null) { + return; + } + + listener.establishedDynamicTunnel(getSession(), local, boundAddress, reason); + } + + protected void signalEstablishingDynamicTunnel(SshdSocketAddress local) throws IOException { + try { + invokePortEventListenerSignaller(l -> { + signalEstablishingDynamicTunnel(l, local); + return null; + }); + } catch (Throwable t) { + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } else if (t instanceof Error) { + throw (Error) t; + } else if (t instanceof IOException) { + throw (IOException) t; + } else { + throw new IOException("Failed (" + t.getClass().getSimpleName() + ")" + + " to signal establishing dynamic tunnel for local=" + local, t); + } + } + } + + protected void signalEstablishingDynamicTunnel(PortForwardingEventListener listener, SshdSocketAddress local) throws IOException { + if (listener == null) { + return; + } + + listener.establishingDynamicTunnel(getSession(), local); + } + + @Override + public synchronized void stopDynamicPortForwarding(SshdSocketAddress local) throws IOException { + SocksProxy obj; + synchronized (dynamicLocal) { + obj = dynamicLocal.remove(local.getPort()); + } + + if (obj != null) { + if (log.isDebugEnabled()) { + log.debug("stopDynamicPortForwarding(" + local + ") unbinding"); + } + + signalTearingDownDynamicTunnel(local); + try { + obj.close(true); + acceptor.unbind(local.toInetSocketAddress()); + } catch (RuntimeException e) { + signalTornDownDynamicTunnel(local, e); + throw e; + } + + signalTornDownDynamicTunnel(local, null); + } else { + if (log.isDebugEnabled()) { + log.debug("stopDynamicPortForwarding(" + local + ") no binding found"); + } + } + } + + protected void signalTearingDownDynamicTunnel(SshdSocketAddress address) throws IOException { + try { + invokePortEventListenerSignaller(l -> { + signalTearingDownDynamicTunnel(l, address); + return null; + }); + } catch (Throwable t) { + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } else if (t instanceof Error) { + throw (Error) t; + } else if (t instanceof IOException) { + throw (IOException) t; + } else { + throw new IOException("Failed (" + t.getClass().getSimpleName() + ")" + + " to signal tearing down dynamic tunnel for address=" + address, t); + } + } + } + + protected void signalTearingDownDynamicTunnel(PortForwardingEventListener listener, SshdSocketAddress address) throws IOException { + if (listener == null) { + return; + } + + listener.tearingDownDynamicTunnel(getSession(), address); + } + + protected void signalTornDownDynamicTunnel(SshdSocketAddress address, Throwable reason) throws IOException { + try { + invokePortEventListenerSignaller(l -> { + signalTornDownDynamicTunnel(l, address, reason); + return null; + }); + } catch (Throwable t) { + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } else if (t instanceof Error) { + throw (Error) t; + } else if (t instanceof IOException) { + throw (IOException) t; + } else { + throw new IOException("Failed (" + t.getClass().getSimpleName() + ")" + + " to signal torn down dynamic tunnel for address=" + address, t); + } + } + } + + protected void signalTornDownDynamicTunnel( + PortForwardingEventListener listener, SshdSocketAddress address, Throwable reason) + throws IOException { + if (listener == null) { + return; + } + + listener.tornDownDynamicTunnel(getSession(), address, reason); + } + + @Override + public synchronized SshdSocketAddress getForwardedPort(int remotePort) { + synchronized (remoteToLocal) { + return remoteToLocal.get(remotePort); + } + } + + @Override + public synchronized SshdSocketAddress localPortForwardingRequested(SshdSocketAddress local) throws IOException { + Objects.requireNonNull(local, "Local address is null"); + ValidateUtils.checkTrue(local.getPort() >= 0, "Invalid local port: %s", local); + + Session session = getSession(); + FactoryManager manager = Objects.requireNonNull(session.getFactoryManager(), "No factory manager"); + TcpForwardingFilter filter = manager.getTcpForwardingFilter(); + try { + if ((filter == null) || (!filter.canListen(local, session))) { + if (log.isDebugEnabled()) { + log.debug("localPortForwardingRequested(" + session + ")[" + local + "][haveFilter=" + (filter != null) + "] rejected"); + } + return null; + } + } catch (Error e) { + log.warn("localPortForwardingRequested({})[{}] failed ({}) to consult forwarding filter: {}", + session, local, e.getClass().getSimpleName(), e.getMessage()); + if (log.isDebugEnabled()) { + log.debug("localPortForwardingRequested(" + this + ")[" + local + "] filter consultation failure details", e); + } + throw new RuntimeSshException(e); + } + + signalEstablishingExplicitTunnel(local, null, true); + SshdSocketAddress result; + try { + InetSocketAddress bound = doBind(local, staticIoHandlerFactory); + result = new SshdSocketAddress(bound.getHostString(), bound.getPort()); + if (log.isDebugEnabled()) { + log.debug("localPortForwardingRequested(" + local + "): " + result); + } + + boolean added; + synchronized (localForwards) { + // NOTE !!! it is crucial to use the bound address host name first + added = localForwards.add(new LocalForwardingEntry(result.getHostName(), local.getHostName(), result.getPort())); + } + + if (!added) { + throw new IOException("Failed to add local port forwarding entry for " + local + " -> " + result); + } + } catch (IOException | RuntimeException e) { + try { + localPortForwardingCancelled(local); + } catch (IOException | RuntimeException err) { + e.addSuppressed(e); + } + signalEstablishedExplicitTunnel(local, null, true, null, e); + throw e; + } + + try { + signalEstablishedExplicitTunnel(local, null, true, result, null); + return result; + } catch (IOException | RuntimeException e) { + throw e; + } + } + + @Override + public synchronized void localPortForwardingCancelled(SshdSocketAddress local) throws IOException { + LocalForwardingEntry entry; + synchronized (localForwards) { + entry = LocalForwardingEntry.findMatchingEntry(local.getHostName(), local.getPort(), localForwards); + if (entry != null) { + localForwards.remove(entry); + } + } + + if ((entry != null) && (acceptor != null)) { + if (log.isDebugEnabled()) { + log.debug("localPortForwardingCancelled(" + local + ") unbind " + entry); + } + + signalTearingDownExplicitTunnel(entry, true); + try { + acceptor.unbind(entry.toInetSocketAddress()); + } catch (RuntimeException e) { + signalTornDownExplicitTunnel(entry, true, e); + throw e; + } + + signalTornDownExplicitTunnel(entry, true, null); + } else { + if (log.isDebugEnabled()) { + log.debug("localPortForwardingCancelled(" + local + ") no match/acceptor: " + entry); + } + } + } + + protected void signalEstablishingExplicitTunnel( + SshdSocketAddress local, SshdSocketAddress remote, boolean localForwarding) + throws IOException { + try { + invokePortEventListenerSignaller(l -> { + signalEstablishingExplicitTunnel(l, local, remote, localForwarding); + return null; + }); + } catch (Throwable t) { + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } else if (t instanceof Error) { + throw (Error) t; + } else if (t instanceof IOException) { + throw (IOException) t; + } else { + throw new IOException("Failed (" + t.getClass().getSimpleName() + ")" + + " to signal establishing explicit tunnel for local=" + local + + ", remote=" + remote + ", localForwarding=" + localForwarding, t); + } + } + } + + protected void signalEstablishingExplicitTunnel(PortForwardingEventListener listener, + SshdSocketAddress local, SshdSocketAddress remote, boolean localForwarding) + throws IOException { + if (listener == null) { + return; + } + + listener.establishingExplicitTunnel(getSession(), local, remote, localForwarding); + } + + protected void signalEstablishedExplicitTunnel( + SshdSocketAddress local, SshdSocketAddress remote, boolean localForwarding, + SshdSocketAddress boundAddress, Throwable reason) + throws IOException { + try { + invokePortEventListenerSignaller(l -> { + signalEstablishedExplicitTunnel(l, local, remote, localForwarding, boundAddress, reason); + return null; + }); + } catch (Throwable t) { + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } else if (t instanceof Error) { + throw (Error) t; + } else if (t instanceof IOException) { + throw (IOException) t; + } else { + throw new IOException("Failed (" + t.getClass().getSimpleName() + ")" + + " to signal established explicit tunnel for local=" + local + + ", remote=" + remote + ", localForwarding=" + localForwarding + + ", bound=" + boundAddress, t); + } + } + } + + protected void signalEstablishedExplicitTunnel(PortForwardingEventListener listener, + SshdSocketAddress local, SshdSocketAddress remote, boolean localForwarding, + SshdSocketAddress boundAddress, Throwable reason) + throws IOException { + if (listener == null) { + return; + } + + listener.establishedExplicitTunnel(getSession(), local, remote, localForwarding, boundAddress, reason); + } + + protected void invokePortEventListenerSignaller(Invoker<PortForwardingEventListener, Void> invoker) throws Throwable { + Throwable err = null; + try { + invokePortEventListenerSignallerListeners(getDefaultListeners(), invoker); + } catch (Throwable t) { + Throwable e = GenericUtils.peelException(t); + err = GenericUtils.accumulateException(err, e); + } + + try { + invokePortEventListenerSignallerHolders(managersHolder, invoker); + } catch (Throwable t) { + Throwable e = GenericUtils.peelException(t); + err = GenericUtils.accumulateException(err, e); + } + + + if (err != null) { + throw err; + } + } + + protected void invokePortEventListenerSignallerListeners( + Collection<? extends PortForwardingEventListener> listeners, Invoker<PortForwardingEventListener, Void> invoker) + throws Throwable { + if (GenericUtils.isEmpty(listeners)) { + return; + } + + Throwable err = null; + // Need to go over the hierarchy (session, factory managed, connection service, etc...) + for (PortForwardingEventListener l : listeners) { + if (l == null) { + continue; + } + + try { + invoker.invoke(l); + } catch (Throwable t) { + Throwable e = GenericUtils.peelException(t); + err = GenericUtils.accumulateException(err, e); + } + } + + if (err != null) { + throw err; + } + } + + protected void invokePortEventListenerSignallerHolders( + Collection<? extends PortForwardingEventListenerManager> holders, Invoker<PortForwardingEventListener, Void> invoker) + throws Throwable { + if (GenericUtils.isEmpty(holders)) { + return; + } + + Throwable err = null; + // Need to go over the hierarchy (session, factory managed, connection service, etc...) + for (PortForwardingEventListenerManager m : holders) { + try { + PortForwardingEventListener listener = m.getPortForwardingEventListenerProxy(); + if (listener != null) { + invoker.invoke(listener); + } + } catch (Throwable t) { + Throwable e = GenericUtils.peelException(t); + err = GenericUtils.accumulateException(err, e); + } + + if (m instanceof PortForwardingEventListenerManagerHolder) { + try { + invokePortEventListenerSignallerHolders(((PortForwardingEventListenerManagerHolder) m).getRegisteredManagers(), invoker); + } catch (Throwable t) { + Throwable e = GenericUtils.peelException(t); + err = GenericUtils.accumulateException(err, e); + } + } + } + + if (err != null) { + throw err; + } + } + + @Override + protected synchronized Closeable getInnerCloseable() { + return builder().parallel(dynamicLocal.values()).close(acceptor).build(); + } + + @Override + protected void preClose() { + this.listeners.clear(); + this.managersHolder.clear(); + super.preClose(); + } + + /** + * @param address The request bind address + * @param handlerFactory A {@link Factory} to create an {@link IoHandler} if necessary + * @return The {@link InetSocketAddress} to which the binding occurred + * @throws IOException If failed to bind + */ + private InetSocketAddress doBind(SshdSocketAddress address, Factory<? extends IoHandler> handlerFactory) throws IOException { + if (acceptor == null) { + Session session = getSession(); + FactoryManager manager = Objects.requireNonNull(session.getFactoryManager(), "No factory manager"); + IoServiceFactory factory = Objects.requireNonNull(manager.getIoServiceFactory(), "No I/O service factory"); + IoHandler handler = handlerFactory.create(); + acceptor = factory.createAcceptor(handler); + } + + // TODO find a better way to determine the resulting bind address - what if multi-threaded calls... + Set<SocketAddress> before = acceptor.getBoundAddresses(); + try { + InetSocketAddress bindAddress = address.toInetSocketAddress(); + acceptor.bind(bindAddress); + + Set<SocketAddress> after = acceptor.getBoundAddresses(); + if (GenericUtils.size(after) > 0) { + after.removeAll(before); + } + if (GenericUtils.isEmpty(after)) { + throw new IOException("Error binding to " + address + "[" + bindAddress + "]: no local addresses bound"); + } + + if (after.size() > 1) { + throw new IOException("Multiple local addresses have been bound for " + address + "[" + bindAddress + "]"); + } + return (InetSocketAddress) after.iterator().next(); + } catch (IOException bindErr) { + Set<SocketAddress> after = acceptor.getBoundAddresses(); + if (GenericUtils.isEmpty(after)) { + close(); + } + throw bindErr; + } + } + + @Override + public String toString() { + return getClass().getSimpleName() + "[" + getSession() + "]"; + } + + // + // Static IoHandler implementation + // + + class StaticIoHandler implements IoHandler { + StaticIoHandler() { + super(); + } + + @Override + @SuppressWarnings("synthetic-access") + public void sessionCreated(final IoSession session) throws Exception { + InetSocketAddress local = (InetSocketAddress) session.getLocalAddress(); + int localPort = local.getPort(); + SshdSocketAddress remote = localToRemote.get(localPort); + if (log.isDebugEnabled()) { + log.debug("sessionCreated({}) remote={}", session, remote); + } + + final TcpipClientChannel channel; + if (remote != null) { + channel = new TcpipClientChannel(TcpipClientChannel.Type.Direct, session, remote); + } else { + channel = new TcpipClientChannel(TcpipClientChannel.Type.Forwarded, session, null); + } + session.setAttribute(TcpipClientChannel.class, channel); + + service.registerChannel(channel); + channel.open().addListener(future -> { + Throwable t = future.getException(); + if (t != null) { + log.warn("Failed ({}) to open channel for session={}: {}", + t.getClass().getSimpleName(), session, t.getMessage()); + if (log.isDebugEnabled()) { + log.debug("sessionCreated(" + session + ") channel=" + channel + " open failure details", t); + } + DefaultForwardingFilter.this.service.unregisterChannel(channel); + channel.close(false); + } + }); + } + + @Override + @SuppressWarnings("synthetic-access") + public void sessionClosed(IoSession session) throws Exception { + TcpipClientChannel channel = (TcpipClientChannel) session.getAttribute(TcpipClientChannel.class); + if (channel != null) { + if (log.isDebugEnabled()) { + log.debug("sessionClosed({}) closing channel={}", session, channel); + } + channel.close(false); + } + } + + @Override + @SuppressWarnings("synthetic-access") + public void messageReceived(IoSession session, Readable message) throws Exception { + TcpipClientChannel channel = (TcpipClientChannel) session.getAttribute(TcpipClientChannel.class); + Buffer buffer = new ByteArrayBuffer(message.available() + Long.SIZE, false); + buffer.putBuffer(message); + + Collection<ClientChannelEvent> result = channel.waitFor(STATIC_IO_MSG_RECEIVED_EVENTS, Long.MAX_VALUE); + if (log.isTraceEnabled()) { + log.trace("messageReceived({}) channel={}, len={} wait result: {}", + session, channel, result, buffer.array()); + } + + OutputStream outputStream = channel.getInvertedIn(); + outputStream.write(buffer.array(), buffer.rpos(), buffer.available()); + outputStream.flush(); + } + + @Override + @SuppressWarnings("synthetic-access") + public void exceptionCaught(IoSession session, Throwable cause) throws Exception { + if (log.isDebugEnabled()) { + log.debug("exceptionCaught({}) {}: {}", session, cause.getClass().getSimpleName(), cause.getMessage()); + } + if (log.isTraceEnabled()) { + log.trace("exceptionCaught(" + session + ") caught exception details", cause); + } + session.close(false); + } + } +}