[SSHD-246] Let commands finish stream consumption and cleanly exit Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/f549a71b Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/f549a71b Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/f549a71b
Branch: refs/heads/master Commit: f549a71bc1559df036b2149db0f2a405595b329b Parents: 725bcd9 Author: Guillaume Nodet <[email protected]> Authored: Fri Jul 26 10:37:12 2013 +0200 Committer: Guillaume Nodet <[email protected]> Committed: Fri Jul 26 10:37:12 2013 +0200 ---------------------------------------------------------------------- .../sshd/agent/unix/AgentForwardedChannel.java | 4 +- .../client/channel/AbstractClientChannel.java | 8 +- .../sshd/client/channel/ChannelSession.java | 5 +- .../sshd/common/channel/AbstractChannel.java | 77 ++++++++++++-------- .../common/channel/ChannelPipedInputStream.java | 4 +- .../sshd/common/forward/TcpipClientChannel.java | 14 +++- .../sshd/server/ServerFactoryManager.java | 7 ++ .../server/channel/AbstractServerChannel.java | 1 + .../sshd/server/channel/ChannelSession.java | 45 +++++++++++- .../sshd/server/x11/X11ForwardSupport.java | 14 +++- 10 files changed, 131 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f549a71b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java index 3207074..4f23584 100644 --- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java @@ -66,9 +66,9 @@ public class AgentForwardedChannel extends AbstractClientChannel implements Runn } @Override - protected synchronized void doClose() { + protected synchronized void postClose() { Socket.close(socket); - super.doClose(); + super.postClose(); } protected synchronized void doWriteData(byte[] data, int off, int len) throws IOException { http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f549a71b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java index b786c39..6a332d4 100644 --- a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java @@ -123,9 +123,9 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C } @Override - protected void doClose() { - super.doClose(); + protected void postClose() { IoUtils.closeQuietly(invertedIn, invertedOut, invertedErr, in, out, err); + super.postClose(); } public int waitFor(int mask, long timeout) { @@ -207,7 +207,7 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C } catch (Exception e) { this.openFuture.setException(e); this.closeFuture.setClosed(); - this.doClose(); + this.postClose(); } finally { notifyStateChanged(); } @@ -222,7 +222,7 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C this.openFailureMsg = msg; this.openFuture.setException(new SshException(msg)); this.closeFuture.setClosed(); - this.doClose(); + this.postClose(); notifyStateChanged(); } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f549a71b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java index d6c9124..e18378a 100644 --- a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java +++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java @@ -26,6 +26,7 @@ import org.apache.sshd.common.SshConstants; import org.apache.sshd.common.channel.ChannelOutputStream; import org.apache.sshd.common.channel.ChannelPipedInputStream; import org.apache.sshd.common.channel.ChannelPipedOutputStream; +import org.apache.sshd.common.future.CloseFuture; import org.apache.sshd.common.util.Buffer; /** @@ -76,12 +77,12 @@ public class ChannelSession extends AbstractClientChannel { } @Override - protected void doClose() { - super.doClose(); + protected void postClose() { if (streamPumper != null) { streamPumper.interrupt(); streamPumper = null; } + super.postClose(); } protected void pumpInputStream() { http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f549a71b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java index 71d9fa4..a0caaf8 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java @@ -29,6 +29,9 @@ import org.apache.sshd.common.Session; import org.apache.sshd.common.SshConstants; import org.apache.sshd.common.future.CloseFuture; import org.apache.sshd.common.future.DefaultCloseFuture; +import org.apache.sshd.common.future.DefaultSshFuture; +import org.apache.sshd.common.future.SshFuture; +import org.apache.sshd.common.future.SshFutureListener; import org.apache.sshd.common.util.Buffer; import org.apache.sshd.common.util.BufferUtils; import org.slf4j.Logger; @@ -93,36 +96,43 @@ public abstract class AbstractChannel implements Channel { } public CloseFuture close(boolean immediately) { - if (closeFuture.isClosed()) { - return closeFuture; - } if (closing.compareAndSet(false, true)) { - try { - if (immediately) { - log.debug("Closing channel {} immediately", id); - doClose(); - closeFuture.setClosed(); - notifyStateChanged(); - session.unregisterChannel(this); - } else { - log.debug("Closing channel {} gracefully", id); - doClose(); - log.debug("Send SSH_MSG_CHANNEL_CLOSE on channel {}", id); - Buffer buffer = session.createBuffer(SshConstants.Message.SSH_MSG_CHANNEL_CLOSE, 0); - buffer.putInt(recipient); - session.writePacket(buffer).addListener(new IoFutureListener<WriteFuture>() { - public void operationComplete(WriteFuture future) { - if (closedByOtherSide) { - log.debug("Message SSH_MSG_CHANNEL_CLOSE written on channel {}", id); - closeFuture.setClosed(); - notifyStateChanged(); - } + if (immediately) { + log.debug("Closing channel {} immediately", id); + preClose(immediately).addListener(new SshFutureListener<CloseFuture>() { + public void operationComplete(CloseFuture future) { + postClose(); + closeFuture.setClosed(); + notifyStateChanged(); + session.unregisterChannel(AbstractChannel.this); + } + }); + } else { + log.debug("Closing channel {} gracefully", id); + preClose(immediately).addListener(new SshFutureListener<CloseFuture>() { + public void operationComplete(CloseFuture future) { + log.debug("Send SSH_MSG_CHANNEL_CLOSE on channel {}", id); + Buffer buffer = session.createBuffer(SshConstants.Message.SSH_MSG_CHANNEL_CLOSE, 0); + buffer.putInt(recipient); + try { + session.writePacket(buffer).addListener(new IoFutureListener<WriteFuture>() { + public void operationComplete(WriteFuture future) { + if (closedByOtherSide) { + log.debug("Message SSH_MSG_CHANNEL_CLOSE written on channel {}", id); + postClose(); + closeFuture.setClosed(); + notifyStateChanged(); + } + } + }); + } catch (IOException e) { + log.debug("Exception caught while writing SSH_MSG_CHANNEL_CLOSE packet on channel " + id, e); + postClose(); + closeFuture.setClosed(); + notifyStateChanged(); } - }); - } - } catch (IOException e) { - session.exceptionCaught(e); - closeFuture.setClosed(); + } + }); } } return closeFuture; @@ -134,12 +144,19 @@ public abstract class AbstractChannel implements Channel { if (closedByOtherSide) { close(false); } else { - close(false).setClosed(); + postClose(); + closeFuture.setClosed(); notifyStateChanged(); } } - protected void doClose() { + protected CloseFuture preClose(boolean immediately) { + CloseFuture future = new DefaultCloseFuture(lock); + future.setClosed(); + return future; + } + + protected void postClose() { } protected void writePacket(Buffer buffer) throws IOException { http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f549a71b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java index 1d7b489..2257cd9 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java @@ -40,6 +40,7 @@ public class ChannelPipedInputStream extends InputStream { private final Buffer buffer = new Buffer(); private final byte[] b = new byte[1]; private boolean closed; + private boolean eofSent; private final Lock lock = new ReentrantLock(); private final Condition dataAvailable = lock.newCondition(); @@ -96,13 +97,14 @@ public class ChannelPipedInputStream extends InputStream { lock.lock(); try { for (;;) { - if (closed && !writerClosed) { + if (closed && writerClosed && eofSent || closed && !writerClosed) { throw new IOException("Pipe closed"); } if (buffer.available() > 0) { break; } if (writerClosed) { + eofSent = true; return -1; // no more data to read } try { http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f549a71b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java index d0c4de8..ccd1f77 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import org.apache.mina.core.buffer.IoBuffer; +import org.apache.mina.core.future.IoFutureListener; import org.apache.mina.core.session.IoSession; import org.apache.sshd.client.channel.AbstractClientChannel; import org.apache.sshd.client.future.DefaultOpenFuture; @@ -30,6 +31,8 @@ import org.apache.sshd.common.SshConstants; import org.apache.sshd.common.SshException; import org.apache.sshd.common.SshdSocketAddress; import org.apache.sshd.common.channel.ChannelOutputStream; +import org.apache.sshd.common.future.CloseFuture; +import org.apache.sshd.common.future.DefaultCloseFuture; import org.apache.sshd.common.util.Buffer; /** @@ -96,9 +99,14 @@ public class TcpipClientChannel extends AbstractClientChannel { } @Override - protected synchronized void doClose() { - serverSession.close(false); - super.doClose(); + protected synchronized CloseFuture preClose(boolean immediately) { + final CloseFuture future = new DefaultCloseFuture(null); + serverSession.close(immediately).addListener(new IoFutureListener<org.apache.mina.core.future.CloseFuture>() { + public void operationComplete(org.apache.mina.core.future.CloseFuture f) { + future.setClosed(); + } + }); + return future; } protected synchronized void doWriteData(byte[] data, int off, int len) throws IOException { http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f549a71b/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java b/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java index 81a437d..d47e10c 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java @@ -83,6 +83,13 @@ public interface ServerFactoryManager extends FactoryManager { public static final String AUTH_METHODS = "auth-methods"; /** + * Key used to configure the timeout used when receiving a close request + * on a channel to wait until the command cleanly exits after setting + * an EOF on the input stream. In milliseconds. + */ + public static final String COMMAND_EXIT_TIMEOUT = "command-exit-timeout"; + + /** * Retrieve the list of named factories for <code>UserAuth<code> objects. * * @return a list of named <code>UserAuth</code> factories, never <code>null</code> http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f549a71b/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java index 4ab0412..e96f483 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java @@ -66,6 +66,7 @@ public abstract class AbstractServerChannel extends AbstractChannel { buffer.putByte((byte) 0); buffer.putInt(v); writePacket(buffer); + notifyStateChanged(); } } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f549a71b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java index 5388d8a..1fcf6d0 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java @@ -25,8 +25,10 @@ import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.TimeUnit; import org.apache.sshd.agent.SshAgent; import org.apache.sshd.agent.SshAgentFactory; @@ -37,6 +39,7 @@ import org.apache.sshd.common.PtyMode; import org.apache.sshd.common.SshConstants; import org.apache.sshd.common.channel.ChannelOutputStream; import org.apache.sshd.common.future.CloseFuture; +import org.apache.sshd.common.future.DefaultCloseFuture; import org.apache.sshd.common.future.SshFuture; import org.apache.sshd.common.future.SshFutureListener; import org.apache.sshd.common.util.Buffer; @@ -48,6 +51,7 @@ import org.apache.sshd.server.Environment; import org.apache.sshd.server.ExitCallback; import org.apache.sshd.common.file.FileSystemAware; import org.apache.sshd.common.file.FileSystemFactory; +import org.apache.sshd.server.ServerFactoryManager; import org.apache.sshd.server.SessionAware; import org.apache.sshd.server.Signal; import org.apache.sshd.server.SignalListener; @@ -61,6 +65,8 @@ import org.apache.sshd.server.x11.X11ForwardSupport; */ public class ChannelSession extends AbstractServerChannel { + public static final long DEFAULT_COMMAND_EXIT_TIMEOUT = 5000; + public static class Factory implements NamedFactory<Channel> { public String getName() { @@ -171,19 +177,52 @@ public class ChannelSession extends AbstractServerChannel { protected ChannelDataReceiver receiver; protected StandardEnvironment env = new StandardEnvironment(); protected Buffer tempBuffer; + protected final CloseFuture commandExitFuture = new DefaultCloseFuture(lock); public ChannelSession() { } @Override - protected void doClose() { + protected CloseFuture preClose(boolean immediately) { + if (immediately) { + commandExitFuture.setClosed(); + } else if (!commandExitFuture.isClosed()) { + log.debug("Wait 5s for shell to exit cleanly"); + IoUtils.closeQuietly(receiver); + final TimerTask task = new TimerTask() { + @Override + public void run() { + commandExitFuture.setClosed(); + } + }; + long timeout = DEFAULT_COMMAND_EXIT_TIMEOUT; + String val = getSession().getFactoryManager().getProperties().get(ServerFactoryManager.COMMAND_EXIT_TIMEOUT); + if (val != null) { + try { + timeout = Long.parseLong(val); + } catch (NumberFormatException e) { + // Ignore + } + } + getSession().getFactoryManager().getScheduledExecutorService().schedule(task, timeout, TimeUnit.MILLISECONDS); + commandExitFuture.addListener(new SshFutureListener<CloseFuture>() { + public void operationComplete(CloseFuture future) { + task.cancel(); + } + }); + } + return commandExitFuture; + } + + @Override + protected void postClose() { if (command != null) { command.destroy(); command = null; } remoteWindow.notifyClosed(); IoUtils.closeQuietly(out, err, receiver); - super.doClose(); + super.postClose(); } @Override @@ -568,9 +607,9 @@ public class ChannelSession extends AbstractServerChannel { if (!closing.get()) { sendEof(); sendExitStatus(exitValue); - // TODO: We should wait for all streams to be consumed before closing the channel close(false); } + commandExitFuture.setClosed(); } } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f549a71b/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java b/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java index 37ec331..84d8e9a 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java @@ -24,6 +24,7 @@ import java.net.InetSocketAddress; import java.util.EnumSet; import org.apache.mina.core.buffer.IoBuffer; +import org.apache.mina.core.future.IoFutureListener; import org.apache.mina.core.service.IoAcceptor; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IoEventType; @@ -36,6 +37,8 @@ import org.apache.sshd.client.future.OpenFuture; import org.apache.sshd.common.SshConstants; import org.apache.sshd.common.SshException; import org.apache.sshd.common.channel.ChannelOutputStream; +import org.apache.sshd.common.future.CloseFuture; +import org.apache.sshd.common.future.DefaultCloseFuture; import org.apache.sshd.common.util.Buffer; import org.apache.sshd.server.session.ServerSession; import org.slf4j.Logger; @@ -207,9 +210,14 @@ public class X11ForwardSupport extends IoHandlerAdapter { } @Override - protected synchronized void doClose() { - serverSession.close(false); - super.doClose(); + protected synchronized CloseFuture preClose(boolean immediately) { + final CloseFuture future = new DefaultCloseFuture(null); + serverSession.close(immediately).addListener(new IoFutureListener<org.apache.mina.core.future.CloseFuture>() { + public void operationComplete(org.apache.mina.core.future.CloseFuture f) { + future.setClosed(); + } + }); + return future; } protected synchronized void doWriteData(byte[] data, int off, int len) throws IOException {
