[SSHD-766] Separate forwarding filter functionality according to sshd-config 
options


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/aa551bc0
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/aa551bc0
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/aa551bc0

Branch: refs/heads/master
Commit: aa551bc0ed07430ee768e98a57b75cd56f3927e0
Parents: 306bef2
Author: Goldstein Lyor <l...@c-b4.com>
Authored: Mon Aug 28 15:12:26 2017 +0300
Committer: Lyor Goldstein <lyor.goldst...@gmail.com>
Committed: Mon Aug 28 19:34:56 2017 +0300

----------------------------------------------------------------------
 .../java/org/apache/sshd/client/SshClient.java  |    2 +-
 .../client/session/AbstractClientSession.java   |   24 +-
 .../org/apache/sshd/common/BaseBuilder.java     |   20 +-
 .../org/apache/sshd/common/FactoryManager.java  |   25 +-
 .../common/config/AllowTcpForwardingValue.java  |   63 --
 .../sshd/common/config/SshConfigFileReader.java |   13 -
 .../common/forward/DefaultForwarderFactory.java |   82 ++
 .../common/forward/DefaultForwardingFilter.java | 1003 +++++++++++++++++
 .../common/forward/DefaultTcpipForwarder.java   | 1007 ------------------
 .../forward/DefaultTcpipForwarderFactory.java   |   82 --
 .../sshd/common/forward/ForwardingFilter.java   |   59 +
 .../common/forward/ForwardingFilterFactory.java |   37 +
 .../sshd/common/forward/TcpipForwarder.java     |   54 -
 .../common/forward/TcpipForwarderFactory.java   |   37 -
 .../common/helpers/AbstractFactoryManager.java  |   22 +-
 .../sshd/common/session/ConnectionService.java  |    8 +-
 .../helpers/AbstractConnectionService.java      |   32 +-
 .../java/org/apache/sshd/server/SshServer.java  |   56 +-
 .../sshd/server/channel/ChannelSession.java     |    7 +-
 .../server/config/AllowTcpForwardingValue.java  |  106 ++
 .../config/SshServerConfigFileReader.java       |  111 ++
 .../server/forward/AgentForwardingFilter.java   |   51 +
 .../sshd/server/forward/ForwardingFilter.java   |  162 +--
 .../server/forward/TcpForwardingFilter.java     |  162 +++
 .../sshd/server/forward/TcpipServerChannel.java |    4 +-
 .../server/forward/X11ForwardingFilter.java     |   51 +
 .../global/CancelTcpipForwardHandler.java       |    4 +-
 .../sshd/server/global/TcpipForwardHandler.java |    4 +-
 .../test/java/org/apache/sshd/ProxyTest.java    |    4 +-
 .../java/org/apache/sshd/agent/AgentTest.java   |    4 +-
 .../client/ClientAuthenticationManagerTest.java |    4 +-
 .../forward/ApacheServerApacheClientTest.java   |    4 +-
 .../forward/ApacheServerJSchClientTest.java     |    2 +-
 .../common/forward/PortForwardingLoadTest.java  |    2 +-
 .../sshd/common/forward/PortForwardingTest.java |   18 +-
 .../server/subsystem/sftp/SshFsMounter.java     |    2 +-
 36 files changed, 1808 insertions(+), 1520 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/aa551bc0/sshd-core/src/main/java/org/apache/sshd/client/SshClient.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/SshClient.java 
b/sshd-core/src/main/java/org/apache/sshd/client/SshClient.java
index c8cc1db..e4d37ab 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/SshClient.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/SshClient.java
@@ -378,7 +378,7 @@ public class SshClient extends AbstractFactoryManager 
implements ClientFactoryMa
     protected void checkConfig() {
         super.checkConfig();
 
-        Objects.requireNonNull(getTcpipForwarderFactory(), 
"TcpipForwarderFactory not set");
+        Objects.requireNonNull(getForwarderFactory(), "ForwarderFactory not 
set");
         Objects.requireNonNull(getServerKeyVerifier(), "ServerKeyVerifier not 
set");
         Objects.requireNonNull(getHostConfigEntryResolver(), 
"HostConfigEntryResolver not set");
         Objects.requireNonNull(getClientIdentityLoader(), 
"ClientIdentityLoader not set");

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/aa551bc0/sshd-core/src/main/java/org/apache/sshd/client/session/AbstractClientSession.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/client/session/AbstractClientSession.java
 
b/sshd-core/src/main/java/org/apache/sshd/client/session/AbstractClientSession.java
index 9e85843..4e9e6c9 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/client/session/AbstractClientSession.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/client/session/AbstractClientSession.java
@@ -58,7 +58,7 @@ import org.apache.sshd.common.channel.Channel;
 import org.apache.sshd.common.cipher.BuiltinCiphers;
 import org.apache.sshd.common.cipher.CipherNone;
 import org.apache.sshd.common.config.keys.KeyUtils;
-import org.apache.sshd.common.forward.TcpipForwarder;
+import org.apache.sshd.common.forward.ForwardingFilter;
 import org.apache.sshd.common.future.DefaultKeyExchangeFuture;
 import org.apache.sshd.common.future.KeyExchangeFuture;
 import org.apache.sshd.common.io.IoSession;
@@ -388,37 +388,43 @@ public abstract class AbstractClientSession extends 
AbstractSession implements C
 
     @Override
     public SshdSocketAddress startLocalPortForwarding(SshdSocketAddress local, 
SshdSocketAddress remote) throws IOException {
-        return getTcpipForwarder().startLocalPortForwarding(local, remote);
+        ForwardingFilter filter = getForwardingFilter();
+        return filter.startLocalPortForwarding(local, remote);
     }
 
     @Override
     public void stopLocalPortForwarding(SshdSocketAddress local) throws 
IOException {
-        getTcpipForwarder().stopLocalPortForwarding(local);
+        ForwardingFilter filter = getForwardingFilter();
+        filter.stopLocalPortForwarding(local);
     }
 
     @Override
     public SshdSocketAddress startRemotePortForwarding(SshdSocketAddress 
remote, SshdSocketAddress local) throws IOException {
-        return getTcpipForwarder().startRemotePortForwarding(remote, local);
+        ForwardingFilter filter = getForwardingFilter();
+        return filter.startRemotePortForwarding(remote, local);
     }
 
     @Override
     public void stopRemotePortForwarding(SshdSocketAddress remote) throws 
IOException {
-        getTcpipForwarder().stopRemotePortForwarding(remote);
+        ForwardingFilter filter = getForwardingFilter();
+        filter.stopRemotePortForwarding(remote);
     }
 
     @Override
     public SshdSocketAddress startDynamicPortForwarding(SshdSocketAddress 
local) throws IOException {
-        return getTcpipForwarder().startDynamicPortForwarding(local);
+        ForwardingFilter filter = getForwardingFilter();
+        return filter.startDynamicPortForwarding(local);
     }
 
     @Override
     public void stopDynamicPortForwarding(SshdSocketAddress local) throws 
IOException {
-        getTcpipForwarder().stopDynamicPortForwarding(local);
+        ForwardingFilter filter = getForwardingFilter();
+        filter.stopDynamicPortForwarding(local);
     }
 
-    protected TcpipForwarder getTcpipForwarder() {
+    protected ForwardingFilter getForwardingFilter() {
         ConnectionService service = 
Objects.requireNonNull(getConnectionService(), "No connection service");
-        return Objects.requireNonNull(service.getTcpipForwarder(), "No 
forwarder");
+        return Objects.requireNonNull(service.getForwardingFilter(), "No 
forwarder");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/aa551bc0/sshd-core/src/main/java/org/apache/sshd/common/BaseBuilder.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/BaseBuilder.java 
b/sshd-core/src/main/java/org/apache/sshd/common/BaseBuilder.java
index 534ac0e..250dc63 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/BaseBuilder.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/BaseBuilder.java
@@ -30,8 +30,8 @@ import org.apache.sshd.common.cipher.Cipher;
 import org.apache.sshd.common.compression.Compression;
 import org.apache.sshd.common.file.FileSystemFactory;
 import org.apache.sshd.common.file.nativefs.NativeFileSystemFactory;
-import org.apache.sshd.common.forward.DefaultTcpipForwarderFactory;
-import org.apache.sshd.common.forward.TcpipForwarderFactory;
+import org.apache.sshd.common.forward.DefaultForwarderFactory;
+import org.apache.sshd.common.forward.ForwardingFilterFactory;
 import org.apache.sshd.common.helpers.AbstractFactoryManager;
 import org.apache.sshd.common.kex.BuiltinDHFactories;
 import org.apache.sshd.common.kex.KeyExchange;
@@ -59,7 +59,7 @@ public class BaseBuilder<T extends AbstractFactoryManager, S 
extends BaseBuilder
 
     public static final ForwardingFilter DEFAULT_FORWARDING_FILTER = 
RejectAllForwardingFilter.INSTANCE;
 
-    public static final TcpipForwarderFactory DEFAULT_FORWARDER_FACTORY = 
DefaultTcpipForwarderFactory.INSTANCE;
+    public static final ForwardingFilterFactory DEFAULT_FORWARDER_FACTORY = 
DefaultForwarderFactory.INSTANCE;
 
     /**
      * The default {@link BuiltinCiphers} setup in order of preference
@@ -138,7 +138,7 @@ public class BaseBuilder<T extends AbstractFactoryManager, 
S extends BaseBuilder
     protected Factory<Random> randomFactory;
     protected List<NamedFactory<Channel>> channelFactories;
     protected FileSystemFactory fileSystemFactory;
-    protected TcpipForwarderFactory tcpipForwarderFactory;
+    protected ForwardingFilterFactory forwarderFactory;
     protected List<RequestHandler<ConnectionService>> globalRequestHandlers;
     protected ForwardingFilter forwardingFilter;
 
@@ -171,8 +171,8 @@ public class BaseBuilder<T extends AbstractFactoryManager, 
S extends BaseBuilder
             forwardingFilter = DEFAULT_FORWARDING_FILTER;
         }
 
-        if (tcpipForwarderFactory == null) {
-            tcpipForwarderFactory = DEFAULT_FORWARDER_FACTORY;
+        if (forwarderFactory == null) {
+            forwarderFactory = DEFAULT_FORWARDER_FACTORY;
         }
 
         return me();
@@ -223,8 +223,8 @@ public class BaseBuilder<T extends AbstractFactoryManager, 
S extends BaseBuilder
         return me();
     }
 
-    public S tcpipForwarderFactory(TcpipForwarderFactory 
tcpipForwarderFactory) {
-        this.tcpipForwarderFactory = tcpipForwarderFactory;
+    public S forwarderFactory(ForwardingFilterFactory forwarderFactory) {
+        this.forwarderFactory = forwarderFactory;
         return me();
     }
 
@@ -253,8 +253,8 @@ public class BaseBuilder<T extends AbstractFactoryManager, 
S extends BaseBuilder
         ssh.setMacFactories(macFactories);
         ssh.setChannelFactories(channelFactories);
         ssh.setFileSystemFactory(fileSystemFactory);
-        ssh.setTcpipForwardingFilter(forwardingFilter);
-        ssh.setTcpipForwarderFactory(tcpipForwarderFactory);
+        ssh.setForwardingFilter(forwardingFilter);
+        ssh.setForwarderFactory(forwarderFactory);
         ssh.setGlobalRequestHandlers(globalRequestHandlers);
         return ssh;
     }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/aa551bc0/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java 
b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
index 8652cd8..71e06da 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
@@ -27,15 +27,18 @@ import org.apache.sshd.common.channel.Channel;
 import org.apache.sshd.common.channel.ChannelListenerManager;
 import org.apache.sshd.common.channel.RequestHandler;
 import org.apache.sshd.common.file.FileSystemFactory;
+import org.apache.sshd.common.forward.ForwardingFilterFactory;
 import org.apache.sshd.common.forward.PortForwardingEventListenerManager;
-import org.apache.sshd.common.forward.TcpipForwarderFactory;
 import org.apache.sshd.common.io.IoServiceFactory;
 import org.apache.sshd.common.kex.KexFactoryManager;
 import org.apache.sshd.common.random.Random;
 import org.apache.sshd.common.session.ConnectionService;
 import org.apache.sshd.common.session.ReservedSessionMessagesManager;
 import org.apache.sshd.common.session.SessionListenerManager;
+import org.apache.sshd.server.forward.AgentForwardingFilter;
 import org.apache.sshd.server.forward.ForwardingFilter;
+import org.apache.sshd.server.forward.TcpForwardingFilter;
+import org.apache.sshd.server.forward.X11ForwardingFilter;
 
 /**
  * This interface allows retrieving all the <code>NamedFactory</code> used
@@ -415,14 +418,26 @@ public interface FactoryManager
      *
      * @return The {@link ForwardingFilter} or {@code null}
      */
-    ForwardingFilter getTcpipForwardingFilter();
+    ForwardingFilter getForwardingFilter();
+
+    default TcpForwardingFilter getTcpForwardingFilter() {
+        return getForwardingFilter();
+    }
+
+    default AgentForwardingFilter getAgentForwardingFilter() {
+        return getForwardingFilter();
+    }
+
+    default X11ForwardingFilter getX11ForwardingFilter() {
+        return getForwardingFilter();
+    }
 
     /**
-     * Retrieve the tcpip forwarder factory used to support tcpip forwarding.
+     * Retrieve the forwarder factory used to support forwarding.
      *
-     * @return The {@link TcpipForwarderFactory}
+     * @return The {@link ForwardingFilterFactory}
      */
-    TcpipForwarderFactory getTcpipForwarderFactory();
+    ForwardingFilterFactory getForwarderFactory();
 
     /**
      * Retrieve the <code>FileSystemFactory</code> to be used to traverse the 
file system.

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/aa551bc0/sshd-core/src/main/java/org/apache/sshd/common/config/AllowTcpForwardingValue.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/config/AllowTcpForwardingValue.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/config/AllowTcpForwardingValue.java
deleted file mode 100644
index c641f33..0000000
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/config/AllowTcpForwardingValue.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sshd.common.config;
-
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.Set;
-
-import org.apache.sshd.common.util.GenericUtils;
-
-/**
- * @author <a href="mailto:d...@mina.apache.org";>Apache MINA SSHD Project</a>
- * @see <A 
HREF="http://www.freebsd.org/cgi/man.cgi?query=sshd_config&sektion=5";>sshd_config(5)
 section</A>
- */
-public enum AllowTcpForwardingValue {
-    ALL,
-    NONE,
-    LOCAL,
-    REMOTE;
-
-    public static final Set<AllowTcpForwardingValue> VALUES =
-            
Collections.unmodifiableSet(EnumSet.allOf(AllowTcpForwardingValue.class));
-
-    // NOTE: it also interprets "yes" as "all" and "no" as "none"
-    public static AllowTcpForwardingValue fromString(String s) {
-        if (GenericUtils.isEmpty(s)) {
-            return null;
-        }
-
-        if ("yes".equalsIgnoreCase(s)) {
-            return ALL;
-        }
-
-        if ("no".equalsIgnoreCase(s)) {
-            return NONE;
-        }
-
-        for (AllowTcpForwardingValue v : VALUES) {
-            if (s.equalsIgnoreCase(v.name())) {
-                return v;
-            }
-        }
-
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/aa551bc0/sshd-core/src/main/java/org/apache/sshd/common/config/SshConfigFileReader.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/config/SshConfigFileReader.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/config/SshConfigFileReader.java
index 7f6b5e8..b6d87d4 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/config/SshConfigFileReader.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/config/SshConfigFileReader.java
@@ -78,21 +78,8 @@ public final class SshConfigFileReader {
 
     public static final char COMMENT_CHAR = '#';
 
-    // Some well known configuration properties names and values
-    public static final String BANNER_CONFIG_PROP = "Banner";
-    public static final String VISUAL_HOST_KEY = "VisualHostKey";
-    public static final String DEFAULT_VISUAL_HOST_KEY = "no";
     public static final String COMPRESSION_PROP = "Compression";
     public static final String DEFAULT_COMPRESSION = 
CompressionConfigValue.NO.getName();
-    public static final String ALLOW_TCP_FORWARDING_CONFIG_PROP = 
"AllowTcpForwarding";
-    public static final String DEFAULT_TCP_FORWARDING = "yes";
-    public static final boolean DEFAULT_TCP_FORWARDING_VALUE = 
parseBooleanValue(DEFAULT_TCP_FORWARDING);
-    public static final String ALLOW_AGENT_FORWARDING_CONFIG_PROP = 
"AllowAgentForwarding";
-    public static final String DEFAULT_AGENT_FORWARDING = "yes";
-    public static final boolean DEFAULT_AGENT_FORWARDING_VALUE = 
parseBooleanValue(DEFAULT_AGENT_FORWARDING);
-    public static final String ALLOW_X11_FORWARDING_CONFIG_PROP = 
"X11Forwarding";
-    public static final String DEFAULT_X11_FORWARDING = "yes";
-    public static final boolean DEFAULT_X11_FORWARDING_VALUE = 
parseBooleanValue(DEFAULT_X11_FORWARDING);
     public static final String MAX_SESSIONS_CONFIG_PROP = "MaxSessions";
     public static final int DEFAULT_MAX_SESSIONS = 10;
     public static final String PASSWORD_AUTH_CONFIG_PROP = 
"PasswordAuthentication";

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/aa551bc0/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwarderFactory.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwarderFactory.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwarderFactory.java
new file mode 100644
index 0000000..ace0562
--- /dev/null
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwarderFactory.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.forward;
+
+import java.util.Collection;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import org.apache.sshd.common.session.ConnectionService;
+import org.apache.sshd.common.util.EventListenerUtils;
+
+/**
+ * The default {@link ForwardingFilterFactory} implementation.
+ *
+ * @author <a href="mailto:d...@mina.apache.org";>Apache MINA SSHD Project</a>
+ */
+public class DefaultForwarderFactory implements ForwardingFilterFactory, 
PortForwardingEventListenerManager {
+    public static final DefaultForwarderFactory INSTANCE = new 
DefaultForwarderFactory() {
+        @Override
+        public void addPortForwardingEventListener(PortForwardingEventListener 
listener) {
+            throw new 
UnsupportedOperationException("addPortForwardingListener(" + listener + ") N/A 
on default instance");
+        }
+
+        @Override
+        public void 
removePortForwardingEventListener(PortForwardingEventListener listener) {
+            throw new 
UnsupportedOperationException("removePortForwardingEventListener(" + listener + 
") N/A on default instance");
+        }
+
+        @Override
+        public PortForwardingEventListener 
getPortForwardingEventListenerProxy() {
+            return PortForwardingEventListener.EMPTY;
+        }
+    };
+
+    private final Collection<PortForwardingEventListener> listeners = new 
CopyOnWriteArraySet<>();
+    private final PortForwardingEventListener listenerProxy;
+
+    public DefaultForwarderFactory() {
+        listenerProxy = 
EventListenerUtils.proxyWrapper(PortForwardingEventListener.class, 
getClass().getClassLoader(), listeners);
+    }
+
+    @Override
+    public PortForwardingEventListener getPortForwardingEventListenerProxy() {
+        return listenerProxy;
+    }
+
+    @Override
+    public void addPortForwardingEventListener(PortForwardingEventListener 
listener) {
+        listeners.add(PortForwardingEventListener.validateListener(listener));
+    }
+
+    @Override
+    public void removePortForwardingEventListener(PortForwardingEventListener 
listener) {
+        if (listener == null) {
+            return;
+        }
+
+        
listeners.remove(PortForwardingEventListener.validateListener(listener));
+    }
+
+    @Override
+    public ForwardingFilter create(ConnectionService service) {
+        ForwardingFilter forwarder = new DefaultForwardingFilter(service);
+        forwarder.addPortForwardingEventListenerManager(this);
+        return forwarder;
+    }
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/aa551bc0/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwardingFilter.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwardingFilter.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwardingFilter.java
new file mode 100644
index 0000000..64f37db
--- /dev/null
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwardingFilter.java
@@ -0,0 +1,1003 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.forward;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.sshd.client.channel.ClientChannelEvent;
+import org.apache.sshd.common.Closeable;
+import org.apache.sshd.common.Factory;
+import org.apache.sshd.common.FactoryManager;
+import org.apache.sshd.common.RuntimeSshException;
+import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.SshException;
+import org.apache.sshd.common.io.IoAcceptor;
+import org.apache.sshd.common.io.IoHandler;
+import org.apache.sshd.common.io.IoHandlerFactory;
+import org.apache.sshd.common.io.IoServiceFactory;
+import org.apache.sshd.common.io.IoSession;
+import org.apache.sshd.common.session.ConnectionService;
+import org.apache.sshd.common.session.Session;
+import org.apache.sshd.common.session.SessionHolder;
+import org.apache.sshd.common.util.EventListenerUtils;
+import org.apache.sshd.common.util.GenericUtils;
+import org.apache.sshd.common.util.Invoker;
+import org.apache.sshd.common.util.Readable;
+import org.apache.sshd.common.util.ValidateUtils;
+import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+import org.apache.sshd.common.util.closeable.AbstractInnerCloseable;
+import org.apache.sshd.common.util.net.SshdSocketAddress;
+import org.apache.sshd.server.forward.TcpForwardingFilter;
+
+/**
+ * Requests a &quot;tcpip-forward&quot; action
+ *
+ * @author <a href="mailto:d...@mina.apache.org";>Apache MINA SSHD Project</a>
+ */
+public class DefaultForwardingFilter
+        extends AbstractInnerCloseable
+        implements ForwardingFilter, SessionHolder<Session>, 
PortForwardingEventListenerManager {
+
+    /**
+     * Used to configure the timeout (milliseconds) for receiving a response
+     * for the forwarding request
+     *
+     * @see #DEFAULT_FORWARD_REQUEST_TIMEOUT
+     */
+    public static final String FORWARD_REQUEST_TIMEOUT = 
"tcpip-forward-request-timeout";
+
+    /**
+     * Default value for {@link #FORWARD_REQUEST_TIMEOUT} if none specified
+     */
+    public static final long DEFAULT_FORWARD_REQUEST_TIMEOUT = 
TimeUnit.SECONDS.toMillis(15L);
+
+    public static final Set<ClientChannelEvent> STATIC_IO_MSG_RECEIVED_EVENTS =
+            Collections.unmodifiableSet(EnumSet.of(ClientChannelEvent.OPENED, 
ClientChannelEvent.CLOSED));
+
+    private final ConnectionService service;
+    private final IoHandlerFactory socksProxyIoHandlerFactory = () -> new 
SocksProxy(getConnectionService());
+    private final Session sessionInstance;
+    private final Map<Integer, SshdSocketAddress> localToRemote = new 
TreeMap<>(Comparator.naturalOrder());
+    private final Map<Integer, SshdSocketAddress> remoteToLocal = new 
TreeMap<>(Comparator.naturalOrder());
+    private final Map<Integer, SocksProxy> dynamicLocal = new 
TreeMap<>(Comparator.naturalOrder());
+    private final Set<LocalForwardingEntry> localForwards = new HashSet<>();
+    private final IoHandlerFactory staticIoHandlerFactory = 
StaticIoHandler::new;
+    private final Collection<PortForwardingEventListener> listeners = new 
CopyOnWriteArraySet<>();
+    private final Collection<PortForwardingEventListenerManager> 
managersHolder = new CopyOnWriteArraySet<>();
+    private final PortForwardingEventListener listenerProxy;
+
+    private IoAcceptor acceptor;
+
+    public DefaultForwardingFilter(ConnectionService service) {
+        this.service = Objects.requireNonNull(service, "No connection 
service");
+        this.sessionInstance = Objects.requireNonNull(service.getSession(), 
"No session");
+        this.listenerProxy = 
EventListenerUtils.proxyWrapper(PortForwardingEventListener.class, 
getClass().getClassLoader(), listeners);
+    }
+
+    @Override
+    public PortForwardingEventListener getPortForwardingEventListenerProxy() {
+        return listenerProxy;
+    }
+
+    @Override
+    public void addPortForwardingEventListener(PortForwardingEventListener 
listener) {
+        listeners.add(PortForwardingEventListener.validateListener(listener));
+    }
+
+    @Override
+    public void removePortForwardingEventListener(PortForwardingEventListener 
listener) {
+        if (listener == null) {
+            return;
+        }
+
+        
listeners.remove(PortForwardingEventListener.validateListener(listener));
+    }
+
+    @Override
+    public Collection<PortForwardingEventListenerManager> 
getRegisteredManagers() {
+        return managersHolder.isEmpty() ? Collections.emptyList() : new 
ArrayList<>(managersHolder);
+    }
+
+    @Override
+    public boolean 
addPortForwardingEventListenerManager(PortForwardingEventListenerManager 
manager) {
+        return managersHolder.add(Objects.requireNonNull(manager, "No 
manager"));
+    }
+
+    @Override
+    public boolean 
removePortForwardingEventListenerManager(PortForwardingEventListenerManager 
manager) {
+        if (manager == null) {
+            return false;
+        }
+
+        return managersHolder.remove(manager);
+    }
+
+    @Override
+    public Session getSession() {
+        return sessionInstance;
+    }
+
+    public final ConnectionService getConnectionService() {
+        return service;
+    }
+
+    protected Collection<PortForwardingEventListener> getDefaultListeners() {
+        Collection<PortForwardingEventListener> defaultListeners = new 
ArrayList<>();
+        defaultListeners.add(getPortForwardingEventListenerProxy());
+
+        Session session = getSession();
+        PortForwardingEventListener l = 
session.getPortForwardingEventListenerProxy();
+        if (l != null) {
+            defaultListeners.add(l);
+        }
+
+        FactoryManager manager = (session == null) ? null : 
session.getFactoryManager();
+        l = (manager == null) ? null : 
manager.getPortForwardingEventListenerProxy();
+        if (l != null) {
+            defaultListeners.add(l);
+        }
+
+        return defaultListeners;
+    }
+
+    @Override
+    public synchronized SshdSocketAddress 
startLocalPortForwarding(SshdSocketAddress local, SshdSocketAddress remote) 
throws IOException {
+        Objects.requireNonNull(local, "Local address is null");
+        ValidateUtils.checkTrue(local.getPort() >= 0, "Invalid local port: 
%s", local);
+        Objects.requireNonNull(remote, "Remote address is null");
+
+        if (isClosed()) {
+            throw new IllegalStateException("TcpipForwarder is closed");
+        }
+        if (isClosing()) {
+            throw new IllegalStateException("TcpipForwarder is closing");
+        }
+
+        InetSocketAddress bound;
+        int port;
+        signalEstablishingExplicitTunnel(local, remote, true);
+        try {
+            bound = doBind(local, staticIoHandlerFactory);
+            port = bound.getPort();
+            SshdSocketAddress prev;
+            synchronized (localToRemote) {
+                prev = localToRemote.put(port, remote);
+            }
+
+            if (prev != null) {
+                throw new IOException("Multiple local port forwarding bindings 
on port=" + port + ": current=" + remote + ", previous=" + prev);
+            }
+        } catch (IOException | RuntimeException e) {
+            try {
+                stopLocalPortForwarding(local);
+            } catch (IOException | RuntimeException err) {
+                e.addSuppressed(err);
+            }
+            signalEstablishedExplicitTunnel(local, remote, true, null, e);
+            throw e;
+        }
+
+        try {
+            SshdSocketAddress result = new 
SshdSocketAddress(bound.getHostString(), port);
+            if (log.isDebugEnabled()) {
+                log.debug("startLocalPortForwarding(" + local + " -> " + 
remote + "): " + result);
+            }
+            signalEstablishedExplicitTunnel(local, remote, true, result, null);
+            return result;
+        } catch (IOException | RuntimeException e) {
+            stopLocalPortForwarding(local);
+            throw e;
+        }
+    }
+
+    @Override
+    public synchronized void stopLocalPortForwarding(SshdSocketAddress local) 
throws IOException {
+        Objects.requireNonNull(local, "Local address is null");
+
+        SshdSocketAddress bound;
+        synchronized (localToRemote) {
+            bound = localToRemote.remove(local.getPort());
+        }
+
+        if ((bound != null) && (acceptor != null)) {
+            if (log.isDebugEnabled()) {
+                log.debug("stopLocalPortForwarding(" + local + ") unbind " + 
bound);
+            }
+
+            signalTearingDownExplicitTunnel(bound, true);
+            try {
+                acceptor.unbind(bound.toInetSocketAddress());
+            } catch (RuntimeException e) {
+                signalTornDownExplicitTunnel(bound, true, e);
+                throw e;
+            }
+
+            signalTornDownExplicitTunnel(bound, true, null);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("stopLocalPortForwarding(" + local + ") no 
mapping/acceptor for " + bound);
+            }
+        }
+    }
+
+    @Override
+    public synchronized SshdSocketAddress 
startRemotePortForwarding(SshdSocketAddress remote, SshdSocketAddress local) 
throws IOException {
+        Objects.requireNonNull(local, "Local address is null");
+        Objects.requireNonNull(remote, "Remote address is null");
+
+        String remoteHost = remote.getHostName();
+        int remotePort = remote.getPort();
+        Session session = getSession();
+        Buffer buffer = 
session.createBuffer(SshConstants.SSH_MSG_GLOBAL_REQUEST, remoteHost.length() + 
Long.SIZE);
+        buffer.putString("tcpip-forward");
+        buffer.putBoolean(true);    // want reply
+        buffer.putString(remoteHost);
+        buffer.putInt(remotePort);
+
+        long timeout = session.getLongProperty(FORWARD_REQUEST_TIMEOUT, 
DEFAULT_FORWARD_REQUEST_TIMEOUT);
+        Buffer result;
+        int port;
+        signalEstablishingExplicitTunnel(local, remote, false);
+        try {
+            result = session.request("tcpip-forward", buffer, timeout, 
TimeUnit.MILLISECONDS);
+            if (result == null) {
+                throw new SshException("Tcpip forwarding request denied by 
server");
+            }
+            port = (remotePort == 0) ? result.getInt() : remote.getPort();
+            // TODO: Is it really safe to only store the local address after 
the request ?
+            SshdSocketAddress prev;
+            synchronized (remoteToLocal) {
+                prev = remoteToLocal.put(port, local);
+            }
+
+            if (prev != null) {
+                throw new IOException("Multiple remote port forwarding 
bindings on port=" + port + ": current=" + remote + ", previous=" + prev);
+            }
+        } catch (IOException | RuntimeException e) {
+            try {
+                stopRemotePortForwarding(remote);
+            } catch (IOException | RuntimeException err) {
+                e.addSuppressed(err);
+            }
+            signalEstablishedExplicitTunnel(local, remote, false, null, e);
+            throw e;
+        }
+
+        try {
+            SshdSocketAddress bound = new SshdSocketAddress(remoteHost, port);
+            if (log.isDebugEnabled()) {
+                log.debug("startRemotePortForwarding(" + remote + " -> " + 
local + "): " + bound);
+            }
+
+            signalEstablishedExplicitTunnel(local, remote, false, bound, null);
+            return bound;
+        } catch (IOException | RuntimeException e) {
+            stopRemotePortForwarding(remote);
+            throw e;
+        }
+    }
+
+    @Override
+    public synchronized void stopRemotePortForwarding(SshdSocketAddress 
remote) throws IOException {
+        SshdSocketAddress bound;
+        synchronized (remoteToLocal) {
+            bound = remoteToLocal.remove(remote.getPort());
+        }
+
+        if (bound != null) {
+            if (log.isDebugEnabled()) {
+                log.debug("stopRemotePortForwarding(" + remote + ") cancel 
forwarding to " + bound);
+            }
+
+            String remoteHost = remote.getHostName();
+            Session session = getSession();
+            Buffer buffer = 
session.createBuffer(SshConstants.SSH_MSG_GLOBAL_REQUEST, remoteHost.length() + 
Long.SIZE);
+            buffer.putString("cancel-tcpip-forward");
+            buffer.putBoolean(false);   // want reply
+            buffer.putString(remoteHost);
+            buffer.putInt(remote.getPort());
+
+            signalTearingDownExplicitTunnel(bound, false);
+            try {
+                session.writePacket(buffer);
+            } catch (IOException | RuntimeException e) {
+                signalTornDownExplicitTunnel(bound, false, e);
+                throw e;
+            }
+
+            signalTornDownExplicitTunnel(bound, false, null);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("stopRemotePortForwarding(" + remote + ") no binding 
found");
+            }
+        }
+    }
+
+    protected void signalTearingDownExplicitTunnel(SshdSocketAddress 
boundAddress, boolean localForwarding) throws IOException {
+        try {
+            invokePortEventListenerSignaller(l -> {
+                signalTearingDownExplicitTunnel(l, boundAddress, 
localForwarding);
+                return null;
+            });
+        } catch (Throwable t) {
+            if (t instanceof RuntimeException) {
+                throw (RuntimeException) t;
+            } else if (t instanceof Error) {
+                throw (Error) t;
+            } else if (t instanceof IOException) {
+                throw (IOException) t;
+            } else {
+                throw new IOException("Failed (" + 
t.getClass().getSimpleName() + ")"
+                        + " to signal tearing down explicit tunnel for local=" 
+ localForwarding
+                        + " on bound=" + boundAddress, t);
+            }
+        }
+    }
+
+    protected void signalTearingDownExplicitTunnel(
+            PortForwardingEventListener listener, SshdSocketAddress 
boundAddress, boolean localForwarding)
+                    throws IOException {
+        if (listener == null) {
+            return;
+        }
+
+        listener.tearingDownExplicitTunnel(getSession(), boundAddress, 
localForwarding);
+    }
+
+    protected void signalTornDownExplicitTunnel(SshdSocketAddress 
boundAddress, boolean localForwarding, Throwable reason) throws IOException {
+        try {
+            invokePortEventListenerSignaller(l -> {
+                signalTornDownExplicitTunnel(l, boundAddress, localForwarding, 
reason);
+                return null;
+            });
+        } catch (Throwable t) {
+            if (t instanceof RuntimeException) {
+                throw (RuntimeException) t;
+            } else if (t instanceof Error) {
+                throw (Error) t;
+            } else if (t instanceof IOException) {
+                throw (IOException) t;
+            } else {
+                throw new IOException("Failed (" + 
t.getClass().getSimpleName() + ")"
+                        + " to signal torn down explicit tunnel local=" + 
localForwarding
+                        + " on bound=" + boundAddress, t);
+            }
+        }
+    }
+
+    protected void signalTornDownExplicitTunnel(
+            PortForwardingEventListener listener, SshdSocketAddress 
boundAddress, boolean localForwarding, Throwable reason)
+                    throws IOException {
+        if (listener == null) {
+            return;
+        }
+
+        listener.tornDownExplicitTunnel(getSession(), boundAddress, 
localForwarding, reason);
+    }
+
+    @Override
+    public synchronized SshdSocketAddress 
startDynamicPortForwarding(SshdSocketAddress local) throws IOException {
+        Objects.requireNonNull(local, "Local address is null");
+        ValidateUtils.checkTrue(local.getPort() >= 0, "Invalid local port: 
%s", local);
+
+        if (isClosed()) {
+            throw new IllegalStateException("TcpipForwarder is closed");
+        }
+        if (isClosing()) {
+            throw new IllegalStateException("TcpipForwarder is closing");
+        }
+
+        SocksProxy socksProxy = new SocksProxy(service);
+        SocksProxy prev;
+        InetSocketAddress bound;
+        int port;
+        signalEstablishingDynamicTunnel(local);
+        try {
+            bound = doBind(local, socksProxyIoHandlerFactory);
+            port = bound.getPort();
+            synchronized (dynamicLocal) {
+                prev = dynamicLocal.put(port, socksProxy);
+            }
+
+            if (prev != null) {
+                throw new IOException("Multiple dynamic port mappings found 
for port=" + port + ": current=" + socksProxy + ", previous=" + prev);
+            }
+        } catch (IOException | RuntimeException e) {
+            try {
+                stopDynamicPortForwarding(local);
+            } catch (IOException | RuntimeException err) {
+                e.addSuppressed(err);
+            }
+            signalEstablishedDynamicTunnel(local, null, e);
+            throw e;
+        }
+
+        try {
+            SshdSocketAddress result = new 
SshdSocketAddress(bound.getHostString(), port);
+            if (log.isDebugEnabled()) {
+                log.debug("startDynamicPortForwarding(" + local + "): " + 
result);
+            }
+
+            signalEstablishedDynamicTunnel(local, result, null);
+            return result;
+        } catch (IOException | RuntimeException e) {
+            stopDynamicPortForwarding(local);
+            throw e;
+        }
+    }
+
+    protected void signalEstablishedDynamicTunnel(
+            SshdSocketAddress local, SshdSocketAddress boundAddress, Throwable 
reason)
+                    throws IOException {
+        try {
+            invokePortEventListenerSignaller(l -> {
+                signalEstablishedDynamicTunnel(l, local, boundAddress, reason);
+                return null;
+            });
+        } catch (Throwable t) {
+            if (t instanceof RuntimeException) {
+                throw (RuntimeException) t;
+            } else if (t instanceof Error) {
+                throw (Error) t;
+            } else if (t instanceof IOException) {
+                throw (IOException) t;
+            } else {
+                throw new IOException("Failed (" + 
t.getClass().getSimpleName() + ")"
+                        + " to signal establishing dynamic tunnel for local=" 
+ local
+                        + " on bound=" + boundAddress, t);
+            }
+        }
+    }
+
+    protected void signalEstablishedDynamicTunnel(PortForwardingEventListener 
listener,
+                SshdSocketAddress local, SshdSocketAddress boundAddress, 
Throwable reason)
+                    throws IOException {
+        if (listener == null) {
+            return;
+        }
+
+        listener.establishedDynamicTunnel(getSession(), local, boundAddress, 
reason);
+    }
+
+    protected void signalEstablishingDynamicTunnel(SshdSocketAddress local) 
throws IOException {
+        try {
+            invokePortEventListenerSignaller(l -> {
+                signalEstablishingDynamicTunnel(l, local);
+                return null;
+            });
+        } catch (Throwable t) {
+            if (t instanceof RuntimeException) {
+                throw (RuntimeException) t;
+            } else if (t instanceof Error) {
+                throw (Error) t;
+            } else if (t instanceof IOException) {
+                throw (IOException) t;
+            } else {
+                throw new IOException("Failed (" + 
t.getClass().getSimpleName() + ")"
+                        + " to signal establishing dynamic tunnel for local=" 
+ local, t);
+            }
+        }
+    }
+
+    protected void signalEstablishingDynamicTunnel(PortForwardingEventListener 
listener, SshdSocketAddress local) throws IOException {
+        if (listener == null) {
+            return;
+        }
+
+        listener.establishingDynamicTunnel(getSession(), local);
+    }
+
+    @Override
+    public synchronized void stopDynamicPortForwarding(SshdSocketAddress 
local) throws IOException {
+        SocksProxy obj;
+        synchronized (dynamicLocal) {
+            obj = dynamicLocal.remove(local.getPort());
+        }
+
+        if (obj != null) {
+            if (log.isDebugEnabled()) {
+                log.debug("stopDynamicPortForwarding(" + local + ") 
unbinding");
+            }
+
+            signalTearingDownDynamicTunnel(local);
+            try {
+                obj.close(true);
+                acceptor.unbind(local.toInetSocketAddress());
+            } catch (RuntimeException e) {
+                signalTornDownDynamicTunnel(local, e);
+                throw e;
+            }
+
+            signalTornDownDynamicTunnel(local, null);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("stopDynamicPortForwarding(" + local + ") no binding 
found");
+            }
+        }
+    }
+
+    protected void signalTearingDownDynamicTunnel(SshdSocketAddress address) 
throws IOException {
+        try {
+            invokePortEventListenerSignaller(l -> {
+                signalTearingDownDynamicTunnel(l, address);
+                return null;
+            });
+        } catch (Throwable t) {
+            if (t instanceof RuntimeException) {
+                throw (RuntimeException) t;
+            } else if (t instanceof Error) {
+                throw (Error) t;
+            } else if (t instanceof IOException) {
+                throw (IOException) t;
+            } else {
+                throw new IOException("Failed (" + 
t.getClass().getSimpleName() + ")"
+                        + " to signal tearing down dynamic tunnel for 
address=" + address, t);
+            }
+        }
+    }
+
+    protected void signalTearingDownDynamicTunnel(PortForwardingEventListener 
listener, SshdSocketAddress address) throws IOException {
+        if (listener == null) {
+            return;
+        }
+
+        listener.tearingDownDynamicTunnel(getSession(), address);
+    }
+
+    protected void signalTornDownDynamicTunnel(SshdSocketAddress address, 
Throwable reason) throws IOException {
+        try {
+            invokePortEventListenerSignaller(l -> {
+                signalTornDownDynamicTunnel(l, address, reason);
+                return null;
+            });
+        } catch (Throwable t) {
+            if (t instanceof RuntimeException) {
+                throw (RuntimeException) t;
+            } else if (t instanceof Error) {
+                throw (Error) t;
+            } else if (t instanceof IOException) {
+                throw (IOException) t;
+            } else {
+                throw new IOException("Failed (" + 
t.getClass().getSimpleName() + ")"
+                        + " to signal torn down dynamic tunnel for address=" + 
address, t);
+            }
+        }
+    }
+
+    protected void signalTornDownDynamicTunnel(
+            PortForwardingEventListener listener, SshdSocketAddress address, 
Throwable reason)
+                    throws IOException {
+        if (listener == null) {
+            return;
+        }
+
+        listener.tornDownDynamicTunnel(getSession(), address, reason);
+    }
+
+    @Override
+    public synchronized SshdSocketAddress getForwardedPort(int remotePort) {
+        synchronized (remoteToLocal) {
+            return remoteToLocal.get(remotePort);
+        }
+    }
+
+    @Override
+    public synchronized SshdSocketAddress 
localPortForwardingRequested(SshdSocketAddress local) throws IOException {
+        Objects.requireNonNull(local, "Local address is null");
+        ValidateUtils.checkTrue(local.getPort() >= 0, "Invalid local port: 
%s", local);
+
+        Session session = getSession();
+        FactoryManager manager = 
Objects.requireNonNull(session.getFactoryManager(), "No factory manager");
+        TcpForwardingFilter filter = manager.getTcpForwardingFilter();
+        try {
+            if ((filter == null) || (!filter.canListen(local, session))) {
+                if (log.isDebugEnabled()) {
+                    log.debug("localPortForwardingRequested(" + session + ")[" 
+ local + "][haveFilter=" + (filter != null) + "] rejected");
+                }
+                return null;
+            }
+        } catch (Error e) {
+            log.warn("localPortForwardingRequested({})[{}] failed ({}) to 
consult forwarding filter: {}",
+                     session, local, e.getClass().getSimpleName(), 
e.getMessage());
+            if (log.isDebugEnabled()) {
+                log.debug("localPortForwardingRequested(" + this + ")[" + 
local + "] filter consultation failure details", e);
+            }
+            throw new RuntimeSshException(e);
+        }
+
+        signalEstablishingExplicitTunnel(local, null, true);
+        SshdSocketAddress result;
+        try {
+            InetSocketAddress bound = doBind(local, staticIoHandlerFactory);
+            result = new SshdSocketAddress(bound.getHostString(), 
bound.getPort());
+            if (log.isDebugEnabled()) {
+                log.debug("localPortForwardingRequested(" + local + "): " + 
result);
+            }
+
+            boolean added;
+            synchronized (localForwards) {
+                // NOTE !!! it is crucial to use the bound address host name 
first
+                added = localForwards.add(new 
LocalForwardingEntry(result.getHostName(), local.getHostName(), 
result.getPort()));
+            }
+
+            if (!added) {
+                throw new IOException("Failed to add local port forwarding 
entry for " + local + " -> " + result);
+            }
+        } catch (IOException | RuntimeException e) {
+            try {
+                localPortForwardingCancelled(local);
+            } catch (IOException | RuntimeException err) {
+                e.addSuppressed(e);
+            }
+            signalEstablishedExplicitTunnel(local, null, true, null, e);
+            throw e;
+        }
+
+        try {
+            signalEstablishedExplicitTunnel(local, null, true, result, null);
+            return result;
+        } catch (IOException | RuntimeException e) {
+            throw e;
+        }
+    }
+
+    @Override
+    public synchronized void localPortForwardingCancelled(SshdSocketAddress 
local) throws IOException {
+        LocalForwardingEntry entry;
+        synchronized (localForwards) {
+            entry = 
LocalForwardingEntry.findMatchingEntry(local.getHostName(), local.getPort(), 
localForwards);
+            if (entry != null) {
+                localForwards.remove(entry);
+            }
+        }
+
+        if ((entry != null) && (acceptor != null)) {
+            if (log.isDebugEnabled()) {
+                log.debug("localPortForwardingCancelled(" + local + ") unbind 
" + entry);
+            }
+
+            signalTearingDownExplicitTunnel(entry, true);
+            try {
+                acceptor.unbind(entry.toInetSocketAddress());
+            } catch (RuntimeException e) {
+                signalTornDownExplicitTunnel(entry, true, e);
+                throw e;
+            }
+
+            signalTornDownExplicitTunnel(entry, true, null);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("localPortForwardingCancelled(" + local + ") no 
match/acceptor: " + entry);
+            }
+        }
+    }
+
+    protected void signalEstablishingExplicitTunnel(
+            SshdSocketAddress local, SshdSocketAddress remote, boolean 
localForwarding)
+                    throws IOException {
+        try {
+            invokePortEventListenerSignaller(l -> {
+                signalEstablishingExplicitTunnel(l, local, remote, 
localForwarding);
+                return null;
+            });
+        } catch (Throwable t) {
+            if (t instanceof RuntimeException) {
+                throw (RuntimeException) t;
+            } else if (t instanceof Error) {
+                throw (Error) t;
+            } else if (t instanceof IOException) {
+                throw (IOException) t;
+            } else {
+                throw new IOException("Failed (" + 
t.getClass().getSimpleName() + ")"
+                        + " to signal establishing explicit tunnel for local=" 
+ local
+                        + ", remote=" + remote + ", localForwarding=" + 
localForwarding, t);
+            }
+        }
+    }
+
+    protected void 
signalEstablishingExplicitTunnel(PortForwardingEventListener listener,
+            SshdSocketAddress local, SshdSocketAddress remote, boolean 
localForwarding)
+                    throws IOException {
+        if (listener == null) {
+            return;
+        }
+
+        listener.establishingExplicitTunnel(getSession(), local, remote, 
localForwarding);
+    }
+
+    protected void signalEstablishedExplicitTunnel(
+            SshdSocketAddress local, SshdSocketAddress remote, boolean 
localForwarding,
+            SshdSocketAddress boundAddress, Throwable reason)
+                    throws IOException {
+        try {
+            invokePortEventListenerSignaller(l -> {
+                signalEstablishedExplicitTunnel(l, local, remote, 
localForwarding, boundAddress, reason);
+                return null;
+            });
+        } catch (Throwable t) {
+            if (t instanceof RuntimeException) {
+                throw (RuntimeException) t;
+            } else if (t instanceof Error) {
+                throw (Error) t;
+            } else if (t instanceof IOException) {
+                throw (IOException) t;
+            } else {
+                throw new IOException("Failed (" + 
t.getClass().getSimpleName() + ")"
+                        + " to signal established explicit tunnel for local=" 
+ local
+                        + ", remote=" + remote + ", localForwarding=" + 
localForwarding
+                        + ", bound=" + boundAddress, t);
+            }
+        }
+    }
+
+    protected void signalEstablishedExplicitTunnel(PortForwardingEventListener 
listener,
+            SshdSocketAddress local, SshdSocketAddress remote, boolean 
localForwarding,
+            SshdSocketAddress boundAddress, Throwable reason)
+                    throws IOException {
+        if (listener == null) {
+            return;
+        }
+
+        listener.establishedExplicitTunnel(getSession(), local, remote, 
localForwarding, boundAddress, reason);
+    }
+
+    protected void 
invokePortEventListenerSignaller(Invoker<PortForwardingEventListener, Void> 
invoker) throws Throwable {
+        Throwable err = null;
+        try {
+            invokePortEventListenerSignallerListeners(getDefaultListeners(), 
invoker);
+        } catch (Throwable t) {
+            Throwable e = GenericUtils.peelException(t);
+            err = GenericUtils.accumulateException(err, e);
+        }
+
+        try {
+            invokePortEventListenerSignallerHolders(managersHolder, invoker);
+        } catch (Throwable t) {
+            Throwable e = GenericUtils.peelException(t);
+            err = GenericUtils.accumulateException(err, e);
+        }
+
+
+        if (err != null) {
+            throw err;
+        }
+    }
+
+    protected void invokePortEventListenerSignallerListeners(
+            Collection<? extends PortForwardingEventListener> listeners, 
Invoker<PortForwardingEventListener, Void> invoker)
+                    throws Throwable {
+        if (GenericUtils.isEmpty(listeners)) {
+            return;
+        }
+
+        Throwable err = null;
+        // Need to go over the hierarchy (session, factory managed, connection 
service, etc...)
+        for (PortForwardingEventListener l : listeners) {
+            if (l == null) {
+                continue;
+            }
+
+            try {
+                invoker.invoke(l);
+            } catch (Throwable t) {
+                Throwable e = GenericUtils.peelException(t);
+                err = GenericUtils.accumulateException(err, e);
+            }
+        }
+
+        if (err != null) {
+            throw err;
+        }
+    }
+
+    protected void invokePortEventListenerSignallerHolders(
+            Collection<? extends PortForwardingEventListenerManager> holders, 
Invoker<PortForwardingEventListener, Void> invoker)
+                    throws Throwable {
+        if (GenericUtils.isEmpty(holders)) {
+            return;
+        }
+
+        Throwable err = null;
+        // Need to go over the hierarchy (session, factory managed, connection 
service, etc...)
+        for (PortForwardingEventListenerManager m : holders) {
+            try {
+                PortForwardingEventListener listener = 
m.getPortForwardingEventListenerProxy();
+                if (listener != null) {
+                    invoker.invoke(listener);
+                }
+            } catch (Throwable t) {
+                Throwable e = GenericUtils.peelException(t);
+                err = GenericUtils.accumulateException(err, e);
+            }
+
+            if (m instanceof PortForwardingEventListenerManagerHolder) {
+                try {
+                    
invokePortEventListenerSignallerHolders(((PortForwardingEventListenerManagerHolder)
 m).getRegisteredManagers(), invoker);
+                } catch (Throwable t) {
+                    Throwable e = GenericUtils.peelException(t);
+                    err = GenericUtils.accumulateException(err, e);
+                }
+            }
+        }
+
+        if (err != null) {
+            throw err;
+        }
+    }
+
+    @Override
+    protected synchronized Closeable getInnerCloseable() {
+        return 
builder().parallel(dynamicLocal.values()).close(acceptor).build();
+    }
+
+    @Override
+    protected void preClose() {
+        this.listeners.clear();
+        this.managersHolder.clear();
+        super.preClose();
+    }
+
+    /**
+     * @param address        The request bind address
+     * @param handlerFactory A {@link Factory} to create an {@link IoHandler} 
if necessary
+     * @return The {@link InetSocketAddress} to which the binding occurred
+     * @throws IOException If failed to bind
+     */
+    private InetSocketAddress doBind(SshdSocketAddress address, Factory<? 
extends IoHandler> handlerFactory) throws IOException {
+        if (acceptor == null) {
+            Session session = getSession();
+            FactoryManager manager = 
Objects.requireNonNull(session.getFactoryManager(), "No factory manager");
+            IoServiceFactory factory = 
Objects.requireNonNull(manager.getIoServiceFactory(), "No I/O service factory");
+            IoHandler handler = handlerFactory.create();
+            acceptor = factory.createAcceptor(handler);
+        }
+
+        // TODO find a better way to determine the resulting bind address - 
what if multi-threaded calls...
+        Set<SocketAddress> before = acceptor.getBoundAddresses();
+        try {
+            InetSocketAddress bindAddress = address.toInetSocketAddress();
+            acceptor.bind(bindAddress);
+
+            Set<SocketAddress> after = acceptor.getBoundAddresses();
+            if (GenericUtils.size(after) > 0) {
+                after.removeAll(before);
+            }
+            if (GenericUtils.isEmpty(after)) {
+                throw new IOException("Error binding to " + address + "[" + 
bindAddress + "]: no local addresses bound");
+            }
+
+            if (after.size() > 1) {
+                throw new IOException("Multiple local addresses have been 
bound for " + address + "[" + bindAddress + "]");
+            }
+            return (InetSocketAddress) after.iterator().next();
+        } catch (IOException bindErr) {
+            Set<SocketAddress> after = acceptor.getBoundAddresses();
+            if (GenericUtils.isEmpty(after)) {
+                close();
+            }
+            throw bindErr;
+        }
+    }
+
+    @Override
+    public String toString() {
+        return getClass().getSimpleName() + "[" + getSession() + "]";
+    }
+
+    //
+    // Static IoHandler implementation
+    //
+
+    class StaticIoHandler implements IoHandler {
+        StaticIoHandler() {
+            super();
+        }
+
+        @Override
+        @SuppressWarnings("synthetic-access")
+        public void sessionCreated(final IoSession session) throws Exception {
+            InetSocketAddress local = (InetSocketAddress) 
session.getLocalAddress();
+            int localPort = local.getPort();
+            SshdSocketAddress remote = localToRemote.get(localPort);
+            if (log.isDebugEnabled()) {
+                log.debug("sessionCreated({}) remote={}", session, remote);
+            }
+
+            final TcpipClientChannel channel;
+            if (remote != null) {
+                channel = new 
TcpipClientChannel(TcpipClientChannel.Type.Direct, session, remote);
+            } else {
+                channel = new 
TcpipClientChannel(TcpipClientChannel.Type.Forwarded, session, null);
+            }
+            session.setAttribute(TcpipClientChannel.class, channel);
+
+            service.registerChannel(channel);
+            channel.open().addListener(future -> {
+                Throwable t = future.getException();
+                if (t != null) {
+                    log.warn("Failed ({}) to open channel for session={}: {}",
+                             t.getClass().getSimpleName(), session, 
t.getMessage());
+                    if (log.isDebugEnabled()) {
+                        log.debug("sessionCreated(" + session + ") channel=" + 
channel + " open failure details", t);
+                    }
+                    
DefaultForwardingFilter.this.service.unregisterChannel(channel);
+                    channel.close(false);
+                }
+            });
+        }
+
+        @Override
+        @SuppressWarnings("synthetic-access")
+        public void sessionClosed(IoSession session) throws Exception {
+            TcpipClientChannel channel = (TcpipClientChannel) 
session.getAttribute(TcpipClientChannel.class);
+            if (channel != null) {
+                if (log.isDebugEnabled()) {
+                    log.debug("sessionClosed({}) closing channel={}", session, 
channel);
+                }
+                channel.close(false);
+            }
+        }
+
+        @Override
+        @SuppressWarnings("synthetic-access")
+        public void messageReceived(IoSession session, Readable message) 
throws Exception {
+            TcpipClientChannel channel = (TcpipClientChannel) 
session.getAttribute(TcpipClientChannel.class);
+            Buffer buffer = new ByteArrayBuffer(message.available() + 
Long.SIZE, false);
+            buffer.putBuffer(message);
+
+            Collection<ClientChannelEvent> result = 
channel.waitFor(STATIC_IO_MSG_RECEIVED_EVENTS, Long.MAX_VALUE);
+            if (log.isTraceEnabled()) {
+                log.trace("messageReceived({}) channel={}, len={} wait result: 
{}",
+                          session, channel, result, buffer.array());
+            }
+
+            OutputStream outputStream = channel.getInvertedIn();
+            outputStream.write(buffer.array(), buffer.rpos(), 
buffer.available());
+            outputStream.flush();
+        }
+
+        @Override
+        @SuppressWarnings("synthetic-access")
+        public void exceptionCaught(IoSession session, Throwable cause) throws 
Exception {
+            if (log.isDebugEnabled()) {
+                log.debug("exceptionCaught({}) {}: {}", session, 
cause.getClass().getSimpleName(), cause.getMessage());
+            }
+            if (log.isTraceEnabled()) {
+                log.trace("exceptionCaught(" + session + ") caught exception 
details", cause);
+            }
+            session.close(false);
+        }
+    }
+}

Reply via email to