[SSHD-218] Possible deadlock in client when getting SSH_MSG_DISCONNECT on invalid channel
Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/8bfdf38c Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/8bfdf38c Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/8bfdf38c Branch: refs/heads/master Commit: 8bfdf38cd74ff669fcdb8ca78bd34e77decbc98e Parents: 4e75e8f Author: Guillaume Nodet <[email protected]> Authored: Tue Jul 23 12:09:26 2013 +0200 Committer: Guillaume Nodet <[email protected]> Committed: Tue Jul 23 12:09:26 2013 +0200 ---------------------------------------------------------------------- .../client/channel/AbstractClientChannel.java | 96 +++++++++----------- .../sshd/client/channel/ChannelSession.java | 2 +- .../sshd/common/channel/AbstractChannel.java | 86 +++++++++--------- .../sshd/common/forward/TcpipServerChannel.java | 20 ++-- .../sshd/server/channel/ChannelSession.java | 38 ++++---- .../sshd/server/command/UnknownCommand.java | 24 +++-- .../test/java/org/apache/sshd/ClientTest.java | 35 +++++++ 7 files changed, 167 insertions(+), 134 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/8bfdf38c/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java index eb2ea17..8d04dba 100644 --- a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java @@ -40,7 +40,7 @@ import org.apache.sshd.common.util.IoUtils; */ public abstract class AbstractClientChannel extends AbstractChannel implements ClientChannel { - protected boolean opened; + protected volatile boolean opened; protected final String type; protected InputStream in; protected OutputStream invertedIn; @@ -96,29 +96,27 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C @Override public CloseFuture close(final boolean immediately) { - synchronized (lock) { - if (!closeFuture.isDone()) { - if (opened) { + if (!closeFuture.isDone()) { + if (opened) { + super.close(immediately); + } else if (openFuture != null) { + if (immediately) { + openFuture.setException(new SshException("Channel closed")); super.close(immediately); - } else if (openFuture != null) { - if (immediately) { - openFuture.setException(new SshException("Channel closed")); - super.close(immediately); - } else { - openFuture.addListener(new SshFutureListener<OpenFuture>() { - public void operationComplete(OpenFuture future) { - if (future.isOpened()) { - close(immediately); - } else { - close(true); - } - } - }); - } } else { - closeFuture.setClosed(); - lock.notifyAll(); + openFuture.addListener(new SshFutureListener<OpenFuture>() { + public void operationComplete(OpenFuture future) { + if (future.isOpened()) { + close(immediately); + } else { + close(true); + } + } + }); } + } else { + closeFuture.setClosed(); + notifyStateChanged(); } } return closeFuture; @@ -200,20 +198,18 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C } public void handleOpenSuccess(int recipient, int rwsize, int rmpsize, Buffer buffer) { - synchronized (lock) { - this.recipient = recipient; - this.remoteWindow.init(rwsize, rmpsize); - try { - doOpen(); - this.opened = true; - this.openFuture.setOpened(); - } catch (Exception e) { - this.openFuture.setException(e); - this.closeFuture.setClosed(); - this.doClose(); - } finally { - lock.notifyAll(); - } + this.recipient = recipient; + this.remoteWindow.init(rwsize, rmpsize); + try { + doOpen(); + this.opened = true; + this.openFuture.setOpened(); + } catch (Exception e) { + this.openFuture.setException(e); + this.closeFuture.setClosed(); + this.doClose(); + } finally { + notifyStateChanged(); } } @@ -222,20 +218,20 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C public void handleOpenFailure(Buffer buffer) { int reason = buffer.getInt(); String msg = buffer.getString(); - synchronized (lock) { - this.openFailureReason = reason; - this.openFailureMsg = msg; - this.openFuture.setException(new SshException(msg)); - this.closeFuture.setClosed(); - this.doClose(); - lock.notifyAll(); - } + this.openFailureReason = reason; + this.openFailureMsg = msg; + this.openFuture.setException(new SshException(msg)); + this.closeFuture.setClosed(); + this.doClose(); + notifyStateChanged(); } protected void doWriteData(byte[] data, int off, int len) throws IOException { if (out != null) { out.write(data, off, len); out.flush(); + } else { + throw new IllegalStateException("No output stream for channel"); } localWindow.consumeAndCheck(len); } @@ -244,6 +240,8 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C if (err != null) { err.write(data, off, len); err.flush(); + } else { + throw new IllegalStateException("No error stream for channel"); } localWindow.consumeAndCheck(len); } @@ -253,16 +251,12 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C String req = buffer.getString(); if ("exit-status".equals(req)) { buffer.getBoolean(); - synchronized (lock) { - exitStatus = buffer.getInt(); - lock.notifyAll(); - } + exitStatus = buffer.getInt(); + notifyStateChanged(); } else if ("exit-signal".equals(req)) { buffer.getBoolean(); - synchronized (lock) { - exitSignal = buffer.getString(); - lock.notifyAll(); - } + exitSignal = buffer.getString(); + notifyStateChanged(); } // TODO: handle other channel requests } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/8bfdf38c/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java index ee48e0d..3166666 100644 --- a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java +++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java @@ -98,7 +98,7 @@ public class ChannelSession extends AbstractClientChannel { } } } catch (Exception e) { - if (!closing) { + if (!closing.get()) { log.info("Caught exception", e); close(false); } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/8bfdf38c/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java index ff2a306..ae2a9e1 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java @@ -19,7 +19,7 @@ package org.apache.sshd.common.channel; import java.io.IOException; -import java.io.InterruptedIOException; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.mina.core.future.IoFutureListener; import org.apache.mina.core.future.WriteFuture; @@ -51,9 +51,9 @@ public abstract class AbstractChannel implements Channel { protected Session session; protected int id; protected int recipient; - protected boolean eof; protected final CloseFuture closeFuture = new DefaultCloseFuture(lock); - protected boolean closing; + protected volatile boolean eof; + protected final AtomicBoolean closing = new AtomicBoolean(); protected boolean closedByOtherSide; public int getId() { @@ -86,56 +86,56 @@ public abstract class AbstractChannel implements Channel { configureWindow(); } + protected void notifyStateChanged() { + synchronized (lock) { + lock.notifyAll(); + } + } + public CloseFuture close(boolean immediately) { if (closeFuture.isClosed()) { return closeFuture; } - try { - synchronized (lock) { + if (closing.compareAndSet(false, true)) { + try { if (immediately) { log.debug("Closing channel {} immediately", id); + doClose(); closeFuture.setClosed(); - lock.notifyAll(); + notifyStateChanged(); session.unregisterChannel(this); } else { - if (!closing) { - closing = true; - log.debug("Send SSH_MSG_CHANNEL_CLOSE on channel {}", id); - Buffer buffer = session.createBuffer(SshConstants.Message.SSH_MSG_CHANNEL_CLOSE, 0); - buffer.putInt(recipient); - session.writePacket(buffer).addListener(new IoFutureListener<WriteFuture>() { - public void operationComplete(WriteFuture future) { - synchronized (lock) { - if (closedByOtherSide) { - log.debug("Message SSH_MSG_CHANNEL_CLOSE written on channel {}", id); - closeFuture.setClosed(); - doClose(); - lock.notifyAll(); - } - } + log.debug("Closing channel {} gracefully", id); + doClose(); + log.debug("Send SSH_MSG_CHANNEL_CLOSE on channel {}", id); + Buffer buffer = session.createBuffer(SshConstants.Message.SSH_MSG_CHANNEL_CLOSE, 0); + buffer.putInt(recipient); + session.writePacket(buffer).addListener(new IoFutureListener<WriteFuture>() { + public void operationComplete(WriteFuture future) { + if (closedByOtherSide) { + log.debug("Message SSH_MSG_CHANNEL_CLOSE written on channel {}", id); + closeFuture.setClosed(); + notifyStateChanged(); } - }); - } + } + }); } + } catch (IOException e) { + session.exceptionCaught(e); + closeFuture.setClosed(); } - } catch (IOException e) { - session.exceptionCaught(e); - closeFuture.setClosed(); } return closeFuture; } public void handleClose() throws IOException { log.debug("Received SSH_MSG_CHANNEL_CLOSE on channel {}", id); - synchronized (lock) { - closedByOtherSide = !closing; - if (closedByOtherSide) { - close(false); - } else { - close(false).setClosed(); - doClose(); - lock.notifyAll(); - } + closedByOtherSide = !closing.get(); + if (closedByOtherSide) { + close(false); + } else { + close(false).setClosed(); + notifyStateChanged(); } } @@ -143,12 +143,10 @@ public abstract class AbstractChannel implements Channel { } protected void writePacket(Buffer buffer) throws IOException { - synchronized (lock) { - if (!closing) { - session.writePacket(buffer); - } else { - log.debug("Discarding output packet because channel is being closed"); - } + if (!closing.get()) { + session.writePacket(buffer); + } else { + log.debug("Discarding output packet because channel is being closed"); } } @@ -187,10 +185,8 @@ public abstract class AbstractChannel implements Channel { public void handleEof() throws IOException { log.debug("Received SSH_MSG_CHANNEL_EOF on channel {}", id); - synchronized (lock) { - eof = true; - lock.notifyAll(); - } + eof = true; + notifyStateChanged(); } public void handleWindowAdjust(Buffer buffer) throws IOException { http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/8bfdf38c/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java index 4c9fa28..5bd314b 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java @@ -117,17 +117,15 @@ public class TcpipServerChannel extends AbstractServerChannel { IoHandler handler = new IoHandlerAdapter() { @Override public void messageReceived(IoSession session, Object message) throws Exception { - synchronized (lock) { - if (closing) { - log.debug("Ignoring write to channel {} in CLOSING state", id); - } else { - IoBuffer ioBuffer = (IoBuffer) message; - int r = ioBuffer.remaining(); - byte[] b = new byte[r]; - ioBuffer.get(b, 0, r); - out.write(b, 0, r); - out.flush(); - } + if (closing.get()) { + log.debug("Ignoring write to channel {} in CLOSING state", id); + } else { + IoBuffer ioBuffer = (IoBuffer) message; + int r = ioBuffer.remaining(); + byte[] b = new byte[r]; + ioBuffer.get(b, 0, r); + out.write(b, 0, r); + out.flush(); } } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/8bfdf38c/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java index a357305..5388d8a 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java @@ -175,23 +175,21 @@ public class ChannelSession extends AbstractServerChannel { public ChannelSession() { } - public CloseFuture close(boolean immediately) { - return super.close(immediately).addListener(new SshFutureListener() { - public void operationComplete(SshFuture sshFuture) { - if (command != null) { - command.destroy(); - command = null; - } - remoteWindow.notifyClosed(); - IoUtils.closeQuietly(out, err, receiver); - } - }); + @Override + protected void doClose() { + if (command != null) { + command.destroy(); + command = null; + } + remoteWindow.notifyClosed(); + IoUtils.closeQuietly(out, err, receiver); + super.doClose(); } @Override public void handleEof() throws IOException { super.handleEof(); - receiver.close(); + IoUtils.closeQuietly(receiver); } public void handleRequest(Buffer buffer) throws IOException { @@ -206,6 +204,10 @@ public class ChannelSession extends AbstractServerChannel { } protected void doWriteData(byte[] data, int off, int len) throws IOException { + // If we're already closing, ignore incoming data + if (closing.get()) { + return; + } if (receiver != null) { int r = receiver.data(this, data, off, len); if (r > 0) { @@ -563,13 +565,11 @@ public class ChannelSession extends AbstractServerChannel { } protected void closeShell(int exitValue) throws IOException { - synchronized (lock) { - if (!closing) { - sendEof(); - sendExitStatus(exitValue); - // TODO: We should wait for all streams to be consumed before closing the channel - close(false); - } + if (!closing.get()) { + sendEof(); + sendExitStatus(exitValue); + // TODO: We should wait for all streams to be consumed before closing the channel + close(false); } } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/8bfdf38c/sshd-core/src/main/java/org/apache/sshd/server/command/UnknownCommand.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/server/command/UnknownCommand.java b/sshd-core/src/main/java/org/apache/sshd/server/command/UnknownCommand.java index 7bcc1c3..95c0fda 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/command/UnknownCommand.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/command/UnknownCommand.java @@ -35,31 +35,41 @@ import org.apache.sshd.server.ExitCallback; */ public class UnknownCommand implements Command { + private String command; + private InputStream in; + private OutputStream out; + private OutputStream err; + private ExitCallback callback; + public UnknownCommand(String command) { + this.command = command; } public void setInputStream(InputStream in) { - //To change body of implemented methods use File | Settings | File Templates. + this.in = in; } public void setOutputStream(OutputStream out) { - //To change body of implemented methods use File | Settings | File Templates. + this.out = out; } public void setErrorStream(OutputStream err) { - //To change body of implemented methods use File | Settings | File Templates. + this.err = err; } public void setExitCallback(ExitCallback callback) { - //To change body of implemented methods use File | Settings | File Templates. + this.callback = callback; } public void start(Environment env) throws IOException { - //To change body of implemented methods use File | Settings | File Templates. - // TODO: send back an error ? + err.write(("Unknown command: " + command + "\n").getBytes()); + err.flush(); + if (callback != null) { + callback.onExit(1, "Unknown command: " + command); + } } public void destroy() { - //To change body of implemented methods use File | Settings | File Templates. } + } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/8bfdf38c/sshd-core/src/test/java/org/apache/sshd/ClientTest.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/test/java/org/apache/sshd/ClientTest.java b/sshd-core/src/test/java/org/apache/sshd/ClientTest.java index 8bf93b4..5db1059 100644 --- a/sshd-core/src/test/java/org/apache/sshd/ClientTest.java +++ b/sshd-core/src/test/java/org/apache/sshd/ClientTest.java @@ -28,6 +28,7 @@ import java.security.KeyPair; import java.util.concurrent.CountDownLatch; import org.apache.mina.core.future.WriteFuture; +import org.apache.sshd.client.channel.ChannelExec; import org.apache.sshd.client.future.AuthFuture; import org.apache.sshd.client.future.OpenFuture; import org.apache.sshd.common.KeyPairProvider; @@ -37,14 +38,19 @@ import org.apache.sshd.common.future.CloseFuture; import org.apache.sshd.common.session.AbstractSession; import org.apache.sshd.common.util.Buffer; import org.apache.sshd.common.util.BufferUtils; +import org.apache.sshd.common.util.NoCloseOutputStream; import org.apache.sshd.server.Command; +import org.apache.sshd.server.CommandFactory; +import org.apache.sshd.server.command.UnknownCommand; import org.apache.sshd.util.*; import org.junit.After; import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; /** @@ -67,6 +73,11 @@ public class ClientTest { sshd.setPort(port); sshd.setKeyPairProvider(Utils.createTestHostKeyProvider()); sshd.setShellFactory(new TestEchoShellFactory()); + sshd.setCommandFactory(new CommandFactory() { + public Command createCommand(String command) { + return new UnknownCommand(command); + } + }); sshd.setPasswordAuthenticator(new BogusPasswordAuthenticator()); sshd.setPublickeyAuthenticator(new BogusPublickeyAuthenticator()); sshd.start(); @@ -81,6 +92,30 @@ public class ClientTest { } @Test + public void testCommandDeadlock() throws Exception { + SshClient client = SshClient.setUpDefaultClient(); + client.start(); + ClientSession session = client.connect("localhost", port).await().getSession(); + session.authPassword("smx", "smx").await().isSuccess(); + ChannelExec channel = session.createExecChannel("test"); + channel.setOut(new NoCloseOutputStream(System.out)); + channel.setErr(new NoCloseOutputStream(System.err)); + channel.open().await(); + Thread.sleep(100); + try { + for (int i = 0; i < 100; i++) { + channel.getInvertedIn().write("a".getBytes()); + channel.getInvertedIn().flush(); + } + } catch (SshException e) { + // That's ok, the channel is being closed by the other side + } + assertEquals(ChannelExec.CLOSED, channel.waitFor(ChannelExec.CLOSED, 0) & ChannelExec.CLOSED); + session.close(false).await(); + client.stop(); + } + + @Test public void testClient() throws Exception { SshClient client = SshClient.setUpDefaultClient(); client.start();
