Repository: mina-sshd Updated Branches: refs/heads/master 251db9b9d -> 721f399bd
[SSHD-721] I/O workers exhaustion in tcpip forwarding Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/ca578eb1 Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/ca578eb1 Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/ca578eb1 Branch: refs/heads/master Commit: ca578eb1c74d94ad89e2d4ab0d94489eace7a0c6 Parents: 3c9efa8 Author: Guillaume Nodet <gno...@apache.org> Authored: Sun Apr 15 21:01:09 2018 +0200 Committer: Guillaume Nodet <gno...@apache.org> Committed: Sun Apr 15 21:01:09 2018 +0200 ---------------------------------------------------------------------- .../common/forward/DefaultForwardingFilter.java | 24 +++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/ca578eb1/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwardingFilter.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwardingFilter.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwardingFilter.java index 7936415..751f73a 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwardingFilter.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwardingFilter.java @@ -37,6 +37,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.sshd.client.channel.ClientChannelEvent; +import org.apache.sshd.client.future.OpenFuture; import org.apache.sshd.common.Closeable; import org.apache.sshd.common.Factory; import org.apache.sshd.common.FactoryManager; @@ -979,15 +980,22 @@ public class DefaultForwardingFilter session, channel, totalMessages, message.available()); } - Collection<ClientChannelEvent> result = channel.waitFor(STATIC_IO_MSG_RECEIVED_EVENTS, Long.MAX_VALUE); - if (traceEnabled) { - log.trace("messageReceived({}) channel={}, count={}, len={} wait result: {}", - session, channel, totalMessages, message.available(), result); + OpenFuture future = channel.getOpenFuture(); + if (future.isOpened()) { + OutputStream outputStream = channel.getInvertedIn(); + outputStream.write(buffer.array(), buffer.rpos(), buffer.available()); + outputStream.flush(); + } else { + future.addListener(f -> { + try { + OutputStream outputStream = channel.getInvertedIn(); + outputStream.write(buffer.array(), buffer.rpos(), buffer.available()); + outputStream.flush(); + } catch (IOException e) { + channel.getSession().exceptionCaught(e); + } + }); } - - OutputStream outputStream = channel.getInvertedIn(); - outputStream.write(buffer.array(), buffer.rpos(), buffer.available()); - outputStream.flush(); } @Override