This is an automated email from the ASF dual-hosted git repository.

gnodet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mina-sshd.git


The following commit(s) were added to refs/heads/master by this push:
     new 45f84aa  [SSHD-1003] Use asynchronous streams when forwarding ports
45f84aa is described below

commit 45f84aab59b2e11d72942cffe9d810e37ab64959
Author: Guillaume Nodet <gno...@gmail.com>
AuthorDate: Thu Jun 4 10:15:54 2020 +0200

    [SSHD-1003] Use asynchronous streams when forwarding ports
---
 .../apache/sshd/common/io/nio2/Nio2Session.java    |  1 -
 .../sshd/server/forward/TcpipServerChannel.java    | 47 +++++++++-------------
 2 files changed, 20 insertions(+), 28 deletions(-)

diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java 
b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
index 1d5fc45..d595e90 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
@@ -368,7 +368,6 @@ public class Nio2Session extends AbstractCloseable 
implements IoSession {
                     log.debug("handleReadCycleCompletion({}) Socket has been 
disconnected (result={}), closing IoSession now",
                             this, result);
                 }
-                close(true);
             }
         } catch (Throwable exc) {
             completionHandler.failed(exc, attachment);
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
 
b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
index 6ec0629..aa07126 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
@@ -19,7 +19,6 @@
 package org.apache.sshd.server.forward;
 
 import java.io.IOException;
-import java.io.OutputStream;
 import java.net.ConnectException;
 import java.net.SocketAddress;
 import java.util.Collections;
@@ -31,9 +30,10 @@ import org.apache.sshd.common.Closeable;
 import org.apache.sshd.common.FactoryManager;
 import org.apache.sshd.common.RuntimeSshException;
 import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.channel.BufferedIoOutputStream;
 import org.apache.sshd.common.channel.Channel;
+import org.apache.sshd.common.channel.ChannelAsyncOutputStream;
 import org.apache.sshd.common.channel.ChannelFactory;
-import org.apache.sshd.common.channel.ChannelOutputStream;
 import org.apache.sshd.common.channel.Window;
 import org.apache.sshd.common.channel.exception.SshChannelOpenException;
 import org.apache.sshd.common.forward.ForwardingTunnelEndpointsProvider;
@@ -41,6 +41,7 @@ import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.io.IoConnectFuture;
 import org.apache.sshd.common.io.IoConnector;
 import org.apache.sshd.common.io.IoHandler;
+import org.apache.sshd.common.io.IoOutputStream;
 import org.apache.sshd.common.io.IoServiceFactory;
 import org.apache.sshd.common.io.IoSession;
 import org.apache.sshd.common.session.Session;
@@ -95,7 +96,7 @@ public class TcpipServerChannel extends AbstractServerChannel 
implements Forward
     private final ForwardingFilter.Type type;
     private IoConnector connector;
     private IoSession ioSession;
-    private OutputStream out;
+    private IoOutputStream out;
     private SshdSocketAddress tunnelEntrance;
     private SshdSocketAddress tunnelExit;
     private SshdSocketAddress originatorAddress;
@@ -196,9 +197,19 @@ public class TcpipServerChannel extends 
AbstractServerChannel implements Forward
             throw new RuntimeSshException(e);
         }
 
-        // TODO: revisit for better threading. Use async io ?
-        out = new ChannelOutputStream(
-                this, getRemoteWindow(), log, 
SshConstants.SSH_MSG_CHANNEL_DATA, true);
+        out = new BufferedIoOutputStream(
+                "tcpip channel", new ChannelAsyncOutputStream(this, 
SshConstants.SSH_MSG_CHANNEL_DATA) {
+                    @SuppressWarnings("synthetic-access")
+                    @Override
+                    protected CloseFuture doCloseGracefully() {
+                        try {
+                            sendEof();
+                        } catch (IOException e) {
+                            session.exceptionCaught(e);
+                        }
+                        return super.doCloseGracefully();
+                    }
+                });
         IoHandler handler = new IoHandler() {
             @Override
             @SuppressWarnings("synthetic-access")
@@ -208,10 +219,9 @@ public class TcpipServerChannel extends 
AbstractServerChannel implements Forward
                         log.debug("doInit({}) Ignoring write to channel in 
CLOSING state", TcpipServerChannel.this);
                     }
                 } else {
-                    Buffer buffer = new ByteArrayBuffer(message.available() + 
Long.SIZE, false);
+                    Buffer buffer = new ByteArrayBuffer(message.available(), 
false);
                     buffer.putBuffer(message);
-                    out.write(buffer.array(), buffer.rpos(), 
buffer.available());
-                    out.flush();
+                    out.writePacket(buffer);
                 }
             }
 
@@ -302,24 +312,7 @@ public class TcpipServerChannel extends 
AbstractServerChannel implements Forward
     @Override
     protected Closeable getInnerCloseable() {
         return builder()
-                .run(toString(), () -> {
-                    /*
-                     * In case of graceful shutdown (e.g. when the remote 
channel is gently closed) we also need to
-                     * close the ChannelOutputStream which flushes remaining 
buffer and sends SSH_MSG_CHANNEL_EOF back
-                     * to the client.
-                     */
-                    if (out != null) {
-                        try {
-                            if (log.isDebugEnabled()) {
-                                log.debug("Closing channel output stream of 
{}", this);
-                            }
-                            out.close();
-                        } catch (IOException | RuntimeException ignored) {
-                            log.debug("{} while closing channel output stream 
of {}: {}",
-                                    ignored.getClass().getSimpleName(), this, 
ignored.getMessage(), ignored);
-                        }
-                    }
-                })
+                .close(out)
                 .close(super.getInnerCloseable())
                 .close(new AbstractCloseable() {
                     private final CloseableExecutorService executor

Reply via email to