[SSHD-449] SSH Exec channel with ClientChannel.Streaming.Async
Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/a4f4e281 Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/a4f4e281 Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/a4f4e281 Branch: refs/heads/master Commit: a4f4e28112e83d2430f4431e9e07ebba07ddc484 Parents: a04f585 Author: Guillaume Nodet <[email protected]> Authored: Mon May 28 10:38:10 2018 +0200 Committer: Guillaume Nodet <[email protected]> Committed: Mon May 28 11:37:48 2018 +0200 ---------------------------------------------------------------------- .../common/channel/ChannelAsyncInputStream.java | 13 +++- .../java/org/apache/sshd/client/ClientTest.java | 63 ++++++++++++++++++++ 2 files changed, 75 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/a4f4e281/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java index e699540..e0c6c26 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java @@ -64,7 +64,18 @@ public class ChannelAsyncInputStream extends AbstractCloseable implements IoInpu public IoReadFuture read(Buffer buf) { IoReadFutureImpl future = new IoReadFutureImpl(readFutureId, buf); if (isClosing()) { - future.setValue(new IOException("Closed")); + synchronized (buffer) { + if (pending != null) { + throw new ReadPendingException("Previous pending read not handled"); + } + if (buffer.available() > 0) { + int nbRead = future.buffer.putBuffer(buffer, false); + buffer.compact(); + future.setValue(nbRead); + } else { + future.setValue(new IOException("Closed")); + } + } } else { synchronized (buffer) { if (pending != null) { http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/a4f4e281/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java b/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java index 0026cc3..43d6d33 100644 --- a/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java +++ b/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java @@ -588,6 +588,69 @@ public class ClientTest extends BaseTestSupport { } @Test + public void testExecAsyncClient() throws Exception { + Logger log = LoggerFactory.getLogger(getClass()); + client.start(); + try (ClientSession session = createTestClientSession()) { + final ByteArrayOutputStream baosOut = new ByteArrayOutputStream(); + final ByteArrayOutputStream baosErr = new ByteArrayOutputStream(); + + final ChannelExec channel = session.createExecChannel("test"); + channel.setStreaming(ClientChannel.Streaming.Async); + OpenFuture open = channel.open(); + + Thread.sleep(100); // Removing this line will make the test succeed + open.addListener(new SshFutureListener<OpenFuture>() { + @Override + public void operationComplete(OpenFuture future) { + channel.getAsyncOut().read(new ByteArrayBuffer()) + .addListener(new SshFutureListener<IoReadFuture>() { + @Override + public void operationComplete(IoReadFuture future) { + try { + future.verify(); + Buffer buffer = future.getBuffer(); + baosOut.write(buffer.array(), buffer.rpos(), buffer.available()); + buffer.rpos(buffer.rpos() + buffer.available()); + buffer.compact(); + channel.getAsyncOut().read(buffer).addListener(this); + } catch (IOException e) { + if (!channel.isClosing()) { + log.error("Error reading", e); + channel.close(true); + } + } + } + }); + channel.getAsyncErr().read(new ByteArrayBuffer()) + .addListener(new SshFutureListener<IoReadFuture>() { + @Override + public void operationComplete(IoReadFuture future) { + try { + future.verify(); + Buffer buffer = future.getBuffer(); + baosErr.write(buffer.array(), buffer.rpos(), buffer.available()); + buffer.rpos(buffer.rpos() + buffer.available()); + buffer.compact(); + channel.getAsyncErr().read(buffer).addListener(this); + } catch (IOException e) { + if (!channel.isClosing()) { + log.error("Error reading", e); + channel.close(true); + } + } + } + }); + } + }); + + channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), 0); + + assertNotEquals(0, baosErr.size()); + } + } + + @Test public void testCommandDeadlock() throws Exception { client.start();
