This is an automated email from the ASF dual-hosted git repository. gnodet pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mina-sshd.git
The following commit(s) were added to refs/heads/master by this push: new 45f84aa [SSHD-1003] Use asynchronous streams when forwarding ports 45f84aa is described below commit 45f84aab59b2e11d72942cffe9d810e37ab64959 Author: Guillaume Nodet <gno...@gmail.com> AuthorDate: Thu Jun 4 10:15:54 2020 +0200 [SSHD-1003] Use asynchronous streams when forwarding ports --- .../apache/sshd/common/io/nio2/Nio2Session.java | 1 - .../sshd/server/forward/TcpipServerChannel.java | 47 +++++++++------------- 2 files changed, 20 insertions(+), 28 deletions(-) diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java index 1d5fc45..d595e90 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java @@ -368,7 +368,6 @@ public class Nio2Session extends AbstractCloseable implements IoSession { log.debug("handleReadCycleCompletion({}) Socket has been disconnected (result={}), closing IoSession now", this, result); } - close(true); } } catch (Throwable exc) { completionHandler.failed(exc, attachment); diff --git a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java index 6ec0629..aa07126 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java @@ -19,7 +19,6 @@ package org.apache.sshd.server.forward; import java.io.IOException; -import java.io.OutputStream; import java.net.ConnectException; import java.net.SocketAddress; import java.util.Collections; @@ -31,9 +30,10 @@ import org.apache.sshd.common.Closeable; import org.apache.sshd.common.FactoryManager; import org.apache.sshd.common.RuntimeSshException; import org.apache.sshd.common.SshConstants; +import org.apache.sshd.common.channel.BufferedIoOutputStream; import org.apache.sshd.common.channel.Channel; +import org.apache.sshd.common.channel.ChannelAsyncOutputStream; import org.apache.sshd.common.channel.ChannelFactory; -import org.apache.sshd.common.channel.ChannelOutputStream; import org.apache.sshd.common.channel.Window; import org.apache.sshd.common.channel.exception.SshChannelOpenException; import org.apache.sshd.common.forward.ForwardingTunnelEndpointsProvider; @@ -41,6 +41,7 @@ import org.apache.sshd.common.future.CloseFuture; import org.apache.sshd.common.io.IoConnectFuture; import org.apache.sshd.common.io.IoConnector; import org.apache.sshd.common.io.IoHandler; +import org.apache.sshd.common.io.IoOutputStream; import org.apache.sshd.common.io.IoServiceFactory; import org.apache.sshd.common.io.IoSession; import org.apache.sshd.common.session.Session; @@ -95,7 +96,7 @@ public class TcpipServerChannel extends AbstractServerChannel implements Forward private final ForwardingFilter.Type type; private IoConnector connector; private IoSession ioSession; - private OutputStream out; + private IoOutputStream out; private SshdSocketAddress tunnelEntrance; private SshdSocketAddress tunnelExit; private SshdSocketAddress originatorAddress; @@ -196,9 +197,19 @@ public class TcpipServerChannel extends AbstractServerChannel implements Forward throw new RuntimeSshException(e); } - // TODO: revisit for better threading. Use async io ? - out = new ChannelOutputStream( - this, getRemoteWindow(), log, SshConstants.SSH_MSG_CHANNEL_DATA, true); + out = new BufferedIoOutputStream( + "tcpip channel", new ChannelAsyncOutputStream(this, SshConstants.SSH_MSG_CHANNEL_DATA) { + @SuppressWarnings("synthetic-access") + @Override + protected CloseFuture doCloseGracefully() { + try { + sendEof(); + } catch (IOException e) { + session.exceptionCaught(e); + } + return super.doCloseGracefully(); + } + }); IoHandler handler = new IoHandler() { @Override @SuppressWarnings("synthetic-access") @@ -208,10 +219,9 @@ public class TcpipServerChannel extends AbstractServerChannel implements Forward log.debug("doInit({}) Ignoring write to channel in CLOSING state", TcpipServerChannel.this); } } else { - Buffer buffer = new ByteArrayBuffer(message.available() + Long.SIZE, false); + Buffer buffer = new ByteArrayBuffer(message.available(), false); buffer.putBuffer(message); - out.write(buffer.array(), buffer.rpos(), buffer.available()); - out.flush(); + out.writePacket(buffer); } } @@ -302,24 +312,7 @@ public class TcpipServerChannel extends AbstractServerChannel implements Forward @Override protected Closeable getInnerCloseable() { return builder() - .run(toString(), () -> { - /* - * In case of graceful shutdown (e.g. when the remote channel is gently closed) we also need to - * close the ChannelOutputStream which flushes remaining buffer and sends SSH_MSG_CHANNEL_EOF back - * to the client. - */ - if (out != null) { - try { - if (log.isDebugEnabled()) { - log.debug("Closing channel output stream of {}", this); - } - out.close(); - } catch (IOException | RuntimeException ignored) { - log.debug("{} while closing channel output stream of {}: {}", - ignored.getClass().getSimpleName(), this, ignored.getMessage(), ignored); - } - } - }) + .close(out) .close(super.getInnerCloseable()) .close(new AbstractCloseable() { private final CloseableExecutorService executor