[SSHD-449] SSH Exec channel with ClientChannel.Streaming.Async

Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/a4f4e281
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/a4f4e281
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/a4f4e281

Branch: refs/heads/master
Commit: a4f4e28112e83d2430f4431e9e07ebba07ddc484
Parents: a04f585
Author: Guillaume Nodet <[email protected]>
Authored: Mon May 28 10:38:10 2018 +0200
Committer: Guillaume Nodet <[email protected]>
Committed: Mon May 28 11:37:48 2018 +0200

----------------------------------------------------------------------
 .../common/channel/ChannelAsyncInputStream.java | 13 +++-
 .../java/org/apache/sshd/client/ClientTest.java | 63 ++++++++++++++++++++
 2 files changed, 75 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/a4f4e281/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java
index e699540..e0c6c26 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java
@@ -64,7 +64,18 @@ public class ChannelAsyncInputStream extends 
AbstractCloseable implements IoInpu
     public IoReadFuture read(Buffer buf) {
         IoReadFutureImpl future = new IoReadFutureImpl(readFutureId, buf);
         if (isClosing()) {
-            future.setValue(new IOException("Closed"));
+            synchronized (buffer) {
+                if (pending != null) {
+                    throw new ReadPendingException("Previous pending read not 
handled");
+                }
+                if (buffer.available() > 0) {
+                    int nbRead = future.buffer.putBuffer(buffer, false);
+                    buffer.compact();
+                    future.setValue(nbRead);
+                } else {
+                    future.setValue(new IOException("Closed"));
+                }
+            }
         } else {
             synchronized (buffer) {
                 if (pending != null) {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/a4f4e281/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java 
b/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java
index 0026cc3..43d6d33 100644
--- a/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java
@@ -588,6 +588,69 @@ public class ClientTest extends BaseTestSupport {
     }
 
     @Test
+    public void testExecAsyncClient() throws Exception {
+        Logger log = LoggerFactory.getLogger(getClass());
+        client.start();
+        try (ClientSession session = createTestClientSession()) {
+            final ByteArrayOutputStream baosOut = new ByteArrayOutputStream();
+            final ByteArrayOutputStream baosErr = new ByteArrayOutputStream();
+
+            final ChannelExec channel = session.createExecChannel("test");
+            channel.setStreaming(ClientChannel.Streaming.Async);
+            OpenFuture open = channel.open();
+
+            Thread.sleep(100); // Removing this line will make the test succeed
+            open.addListener(new SshFutureListener<OpenFuture>() {
+                @Override
+                public void operationComplete(OpenFuture future) {
+                    channel.getAsyncOut().read(new ByteArrayBuffer())
+                            .addListener(new SshFutureListener<IoReadFuture>() 
{
+                                @Override
+                                public void operationComplete(IoReadFuture 
future) {
+                                    try {
+                                        future.verify();
+                                        Buffer buffer = future.getBuffer();
+                                        baosOut.write(buffer.array(), 
buffer.rpos(), buffer.available());
+                                        buffer.rpos(buffer.rpos() + 
buffer.available());
+                                        buffer.compact();
+                                        
channel.getAsyncOut().read(buffer).addListener(this);
+                                    } catch (IOException e) {
+                                        if (!channel.isClosing()) {
+                                            log.error("Error reading", e);
+                                            channel.close(true);
+                                        }
+                                    }
+                                }
+                            });
+                    channel.getAsyncErr().read(new ByteArrayBuffer())
+                            .addListener(new SshFutureListener<IoReadFuture>() 
{
+                                @Override
+                                public void operationComplete(IoReadFuture 
future) {
+                                    try {
+                                        future.verify();
+                                        Buffer buffer = future.getBuffer();
+                                        baosErr.write(buffer.array(), 
buffer.rpos(), buffer.available());
+                                        buffer.rpos(buffer.rpos() + 
buffer.available());
+                                        buffer.compact();
+                                        
channel.getAsyncErr().read(buffer).addListener(this);
+                                    } catch (IOException e) {
+                                        if (!channel.isClosing()) {
+                                            log.error("Error reading", e);
+                                            channel.close(true);
+                                        }
+                                    }
+                                }
+                            });
+                }
+            });
+
+            channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), 0);
+
+            assertNotEquals(0, baosErr.size());
+        }
+    }
+
+    @Test
     public void testCommandDeadlock() throws Exception {
         client.start();
 

Reply via email to