Repository: mina-sshd
Updated Branches:
  refs/heads/master 251db9b9d -> 721f399bd


[SSHD-721] I/O workers exhaustion in tcpip forwarding


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/ca578eb1
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/ca578eb1
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/ca578eb1

Branch: refs/heads/master
Commit: ca578eb1c74d94ad89e2d4ab0d94489eace7a0c6
Parents: 3c9efa8
Author: Guillaume Nodet <gno...@apache.org>
Authored: Sun Apr 15 21:01:09 2018 +0200
Committer: Guillaume Nodet <gno...@apache.org>
Committed: Sun Apr 15 21:01:09 2018 +0200

----------------------------------------------------------------------
 .../common/forward/DefaultForwardingFilter.java | 24 +++++++++++++-------
 1 file changed, 16 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/ca578eb1/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwardingFilter.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwardingFilter.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwardingFilter.java
index 7936415..751f73a 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwardingFilter.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwardingFilter.java
@@ -37,6 +37,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.sshd.client.channel.ClientChannelEvent;
+import org.apache.sshd.client.future.OpenFuture;
 import org.apache.sshd.common.Closeable;
 import org.apache.sshd.common.Factory;
 import org.apache.sshd.common.FactoryManager;
@@ -979,15 +980,22 @@ public class DefaultForwardingFilter
                           session, channel, totalMessages, 
message.available());
             }
 
-            Collection<ClientChannelEvent> result = 
channel.waitFor(STATIC_IO_MSG_RECEIVED_EVENTS, Long.MAX_VALUE);
-            if (traceEnabled) {
-                log.trace("messageReceived({}) channel={}, count={}, len={} 
wait result: {}",
-                          session, channel, totalMessages, 
message.available(), result);
+            OpenFuture future = channel.getOpenFuture();
+            if (future.isOpened()) {
+                OutputStream outputStream = channel.getInvertedIn();
+                outputStream.write(buffer.array(), buffer.rpos(), 
buffer.available());
+                outputStream.flush();
+            } else {
+                future.addListener(f -> {
+                    try {
+                        OutputStream outputStream = channel.getInvertedIn();
+                        outputStream.write(buffer.array(), buffer.rpos(), 
buffer.available());
+                        outputStream.flush();
+                    } catch (IOException e) {
+                        channel.getSession().exceptionCaught(e);
+                    }
+                });
             }
-
-            OutputStream outputStream = channel.getInvertedIn();
-            outputStream.write(buffer.array(), buffer.rpos(), 
buffer.available());
-            outputStream.flush();
         }
 
         @Override

Reply via email to