Repository: mina-sshd Updated Branches: refs/heads/master a9d975b6d -> b0b8d3466
[SSHD-569] Reset session idle timeout while waiting for channel window availability Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/b0b8d346 Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/b0b8d346 Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/b0b8d346 Branch: refs/heads/master Commit: b0b8d34664618d561187996639299eed04867ebf Parents: a9d975b Author: Lyor Goldstein <[email protected]> Authored: Mon Oct 19 07:44:46 2015 +0300 Committer: Lyor Goldstein <[email protected]> Committed: Mon Oct 19 07:44:46 2015 +0300 ---------------------------------------------------------------------- .../org/apache/sshd/common/FactoryManager.java | 2 +- .../common/channel/ChannelOutputStream.java | 64 +++++++--- .../common/channel/ChannelPipedInputStream.java | 1 + .../java/org/apache/sshd/WindowAdjustTest.java | 123 +++++++++++++------ .../sshd/client/subsystem/sftp/SftpTest.java | 3 + 5 files changed, 140 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/b0b8d346/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java index dc0ff3d..14f688c 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java @@ -111,7 +111,7 @@ public interface FactoryManager extends SessionListenerManager, ChannelListenerM /** * Key used to retrieve the value of idle timeout after which * it will close the connection - in milliseconds. - * @see #DEFAULT_AUTH_TIMEOUT + * @see #DEFAULT_IDLE_TIMEOUT */ String IDLE_TIMEOUT = "idle-timeout"; http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/b0b8d346/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java index 0ba9c39..4f17882 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java @@ -23,9 +23,12 @@ import java.io.InterruptedIOException; import java.io.OutputStream; import java.nio.channels.Channel; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.sshd.common.PropertyResolverUtils; import org.apache.sshd.common.SshConstants; import org.apache.sshd.common.SshException; +import org.apache.sshd.common.session.Session; import org.apache.sshd.common.util.ValidateUtils; import org.apache.sshd.common.util.buffer.Buffer; import org.slf4j.Logger; @@ -48,8 +51,8 @@ public class ChannelOutputStream extends OutputStream implements Channel { private final Logger log; private final byte cmd; private final byte[] b = new byte[1]; + private final AtomicBoolean closedState = new AtomicBoolean(false); private Buffer buffer; - private boolean closed; private int bufferLength; private int lastSize; private boolean noDelay; @@ -78,7 +81,7 @@ public class ChannelOutputStream extends OutputStream implements Channel { @Override public boolean isOpen() { - return !closed; + return !closedState.get(); } @Override @@ -90,9 +93,10 @@ public class ChannelOutputStream extends OutputStream implements Channel { @Override public synchronized void write(byte[] buf, int s, int l) throws IOException { if (!isOpen()) { - throw new SshException("write(len=" + l + ") channel already closed"); + throw new SshException("write(" + this + ") len=" + l + " - channel already closed"); } + Session session = channel.getSession(); while (l > 0) { // The maximum amount we should admit without flushing again // is enough to make up one full packet within our allowed @@ -100,21 +104,26 @@ public class ChannelOutputStream extends OutputStream implements Channel { // packet we sent to allow the producer to race ahead and fill // out the next packet before we block and wait for space to // become available again. - // int l2 = Math.min(l, Math.min(remoteWindow.getSize() + lastSize, remoteWindow.getPacketSize()) - bufferLength); if (l2 <= 0) { if (bufferLength > 0) { flush(); } else { + session.resetIdleTimeout(); try { remoteWindow.waitForSpace(maxWaitTimeout); } catch (WindowClosedException e) { - closed = true; + if (!closedState.getAndSet(true)) { + if (log.isDebugEnabled()) { + log.debug("write({})[len={}] closing due to window closed", this, l); + } + } throw e; } catch (InterruptedException e) { - throw (IOException) new InterruptedIOException("Interrupted while waiting for remote space").initCause(e); + throw (IOException) new InterruptedIOException("Interrupted while waiting for remote space on write len=" + l + " to " + this).initCause(e); } } + session.resetIdleTimeout(); continue; } buffer.putRawBytes(buf, s, l2); @@ -122,19 +131,25 @@ public class ChannelOutputStream extends OutputStream implements Channel { s += l2; l -= l2; } - if (noDelay) { + + if (isNoDelay()) { flush(); + } else { + session.resetIdleTimeout(); } } @Override public synchronized void flush() throws IOException { if (!isOpen()) { - throw new SshException("flush(length=" + bufferLength + ") - stream is already closed"); + throw new SshException("flush(" + this + ") length=" + bufferLength + " - stream is already closed"); } try { + Session session = channel.getSession(); while (bufferLength > 0) { + session.resetIdleTimeout(); + Buffer buf = buffer; int total = bufferLength; int available = remoteWindow.waitForSpace(maxWaitTimeout); @@ -152,16 +167,22 @@ public class ChannelOutputStream extends OutputStream implements Channel { bufferLength = leftover; } lastSize = length; + + session.resetIdleTimeout(); remoteWindow.waitAndConsume(length, maxWaitTimeout); - if (log.isDebugEnabled()) { - log.debug("Send {} on channel {}", - (cmd == SshConstants.SSH_MSG_CHANNEL_DATA) ? "SSH_MSG_CHANNEL_DATA" : "SSH_MSG_CHANNEL_EXTENDED_DATA", - Integer.valueOf(channel.getId())); + if (log.isTraceEnabled()) { + log.trace("Send {} on channel {}", + (cmd == SshConstants.SSH_MSG_CHANNEL_DATA) ? "SSH_MSG_CHANNEL_DATA" : "SSH_MSG_CHANNEL_EXTENDED_DATA", + channel); } channel.writePacket(buf); } } catch (WindowClosedException e) { - closed = true; + if (!closedState.getAndSet(true)) { + if (log.isDebugEnabled()) { + log.debug("flush({}) closing due to window closed", this); + } + } throw e; } catch (Exception e) { if (e instanceof IOException) { @@ -175,16 +196,26 @@ public class ChannelOutputStream extends OutputStream implements Channel { @Override public synchronized void close() throws IOException { if (isOpen()) { + if (log.isTraceEnabled()) { + log.trace("close({}) closing", this); + } + try { flush(); } finally { - closed = true; + closedState.set(true); } } } - private void newBuffer(int size) { - buffer = channel.getSession().createBuffer(cmd, size <= 0 ? 0 : 12 + size); + @Override + public String toString() { + return getClass().getSimpleName() + "[" + channel + "]"; + } + + protected void newBuffer(int size) { + Session session = channel.getSession(); + buffer = session.createBuffer(cmd, size <= 0 ? 12 : 12 + size); buffer.putInt(channel.getRecipient()); if (cmd == SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA) { buffer.putInt(1); @@ -192,5 +223,4 @@ public class ChannelOutputStream extends OutputStream implements Channel { buffer.putInt(0); bufferLength = 0; } - } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/b0b8d346/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java index 61c5ec9..c9bcb32 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java @@ -139,6 +139,7 @@ public class ChannelPipedInputStream extends InputStream implements ChannelPiped throw (IOException) new InterruptedIOException("Interrupted at cycle #" + index + " while waiting for data to become available").initCause(e); } } + if (len > buffer.available()) { len = buffer.available(); } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/b0b8d346/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java b/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java index 22c2533..0380e56 100644 --- a/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java +++ b/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java @@ -21,14 +21,17 @@ package org.apache.sshd; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; import java.util.Collection; import java.util.Deque; import java.util.EnumSet; import java.util.LinkedList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.apache.sshd.client.SshClient; import org.apache.sshd.client.channel.ClientChannel; @@ -42,6 +45,8 @@ import org.apache.sshd.common.io.WritePendingException; import org.apache.sshd.common.util.buffer.Buffer; import org.apache.sshd.common.util.buffer.ByteArrayBuffer; import org.apache.sshd.common.util.io.NoCloseOutputStream; +import org.apache.sshd.common.util.logging.AbstractLoggingBean; +import org.apache.sshd.common.util.threads.ThreadUtils; import org.apache.sshd.server.AsyncCommand; import org.apache.sshd.server.Command; import org.apache.sshd.server.Environment; @@ -54,6 +59,8 @@ import org.junit.Before; import org.junit.FixMethodOrder; import org.junit.Test; import org.junit.runners.MethodSorters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * <p> @@ -66,7 +73,8 @@ import org.junit.runners.MethodSorters; @FixMethodOrder(MethodSorters.NAME_ASCENDING) public class WindowAdjustTest extends BaseTestSupport { - public static final String END_FILE = "#"; + public static final byte END_FILE = '#'; + public static final int BIG_MSG_SEND_COUNT = 10000; private SshServer sshServer; private int port; @@ -78,13 +86,13 @@ public class WindowAdjustTest extends BaseTestSupport { @Before public void setUp() throws Exception { sshServer = setupTestServer(); + final byte[] msg = Files.readAllBytes( Paths.get(getClass().getResource("/big-msg.txt").toURI())); - sshServer.setShellFactory(new Factory<Command>() { @Override public Command create() { - return new FloodingAsyncCommand(msg, 10000, END_FILE); + return new FloodingAsyncCommand(msg, BIG_MSG_SEND_COUNT, END_FILE); } }); @@ -101,7 +109,7 @@ public class WindowAdjustTest extends BaseTestSupport { } } - @Test(timeout = 5L * 60L * 1000L) + @Test(timeout = 6L * 60L * 1000L) public void testTrafficHeavyLoad() throws Exception { try (SshClient client = setupTestClient()) { @@ -114,10 +122,10 @@ public class WindowAdjustTest extends BaseTestSupport { try (final ClientChannel channel = session.createShellChannel()) { channel.setOut(new VerifyingOutputStream(channel, END_FILE)); channel.setErr(new NoCloseOutputStream(System.err)); - channel.open(); + channel.open().verify(15L, TimeUnit.SECONDS); Collection<ClientChannel.ClientChannelEvent> result = - channel.waitFor(EnumSet.of(ClientChannel.ClientChannelEvent.CLOSED), TimeUnit.SECONDS.toMillis(15L)); + channel.waitFor(EnumSet.of(ClientChannel.ClientChannelEvent.CLOSED), TimeUnit.MINUTES.toMillis(2L)); assertFalse("Timeout while waiting for channel closure", result.contains(ClientChannel.ClientChannelEvent.TIMEOUT)); } } finally { @@ -130,33 +138,61 @@ public class WindowAdjustTest extends BaseTestSupport { * Read all incoming data and if END_FILE symbol is detected, kill client session to end test */ private static class VerifyingOutputStream extends OutputStream { - + private final Logger log; private final ClientChannel channel; - private String endFile; + private final byte eofSignal; - public VerifyingOutputStream(ClientChannel channel, final String lastMsg) { + public VerifyingOutputStream(ClientChannel channel, final byte eofSignal) { + this.log = LoggerFactory.getLogger(getClass()); this.channel = channel; - this.endFile = lastMsg; + this.eofSignal = eofSignal; } @Override public void write(int b) throws IOException { - if (String.valueOf((char) b).equals(endFile)) { + if (channel.isClosed() || channel.isClosing()) { + throw new IOException("Channel (" + channel + ") is closing / closed on write single byte"); + } + + if (b == (eofSignal & 0xff)) { + log.info("Closing channel (" + channel + ") due to single byte EOF"); + channel.close(true); + } + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + if (channel.isClosed() || channel.isClosing()) { + throw new IOException("Channel (" + channel + ") is closing / closed on write " + len + " bytes"); + } + + if (len <= 0) { + return; + } + + int lastPos = off + len - 1; + if ((b[lastPos] & 0xFF) == (eofSignal & 0xFF)) { + log.info("Closing channel (" + channel + ") due to last byte EOF"); channel.close(true); } } } - public static final class FloodingAsyncCommand implements AsyncCommand { + public static final class FloodingAsyncCommand extends AbstractLoggingBean implements AsyncCommand { + private static final AtomicInteger poolCount = new AtomicInteger(0); + private final AtomicReference<ExecutorService> executorHolder = new AtomicReference<>(); + private final AtomicReference<Future<?>> futureHolder = new AtomicReference<Future<?>>(); + + private AsyncInPendingWrapper pendingWrapper; private byte[] msg; private int sendCount; - private String lastMsg; + private byte eofSignal; - public FloodingAsyncCommand(final byte[] msg, final int sendCount, final String lastMsg) { + public FloodingAsyncCommand(final byte[] msg, final int sendCount, final byte eofSignal) { this.msg = msg; this.sendCount = sendCount; - this.lastMsg = lastMsg; + this.eofSignal = eofSignal; } @Override @@ -166,18 +202,7 @@ public class WindowAdjustTest extends BaseTestSupport { @Override public void setIoOutputStream(IoOutputStream out) { - final AsyncInPendingWrapper a = new AsyncInPendingWrapper(out); - - new Thread(new Runnable() { - @SuppressWarnings("synthetic-access") - @Override - public void run() { - for (int i = 0; i < sendCount; i++) { - a.write(new ByteArrayBuffer(msg)); - } - a.write(new ByteArrayBuffer((lastMsg.getBytes(StandardCharsets.UTF_8)))); - } - }).start(); + pendingWrapper = new AsyncInPendingWrapper(out); } @Override @@ -207,12 +232,41 @@ public class WindowAdjustTest extends BaseTestSupport { @Override public void start(Environment env) throws IOException { - // ignored + log.info("Starting"); + + ExecutorService service = ThreadUtils.newSingleThreadExecutor(getClass().getSimpleName() + "-" + poolCount.incrementAndGet()); + executorHolder.set(service); + + futureHolder.set(service.submit(new Runnable() { + @SuppressWarnings("synthetic-access") + @Override + public void run() { + log.info("Start heavy load sending " + sendCount + " messages of " + msg.length + " bytes"); + for (int i = 0; i < sendCount; i++) { + pendingWrapper.write(new ByteArrayBuffer(msg)); + } + log.info("Sending EOF signal"); + pendingWrapper.write(new ByteArrayBuffer(new byte[]{eofSignal})); + } + })); + log.info("Started"); } @Override public void destroy() { - // ignored + log.info("Destroying"); + + Future<?> future = futureHolder.getAndSet(null); + if ((future != null) && (!future.isDone())) { + log.info("Cancelling"); + future.cancel(true); + } + + ExecutorService service = executorHolder.getAndSet(null); + if ((service != null) && (!service.isShutdown())) { + log.info("Shutdown"); + service.shutdownNow(); + } } } @@ -238,15 +292,14 @@ public class WindowAdjustTest extends BaseTestSupport { } public synchronized void write(final Object msg) { - if (asyncIn != null && !asyncIn.isClosed() && !asyncIn.isClosing()) { - - final Buffer ByteBufferMsg = (Buffer) msg; + if ((asyncIn != null) && (!asyncIn.isClosed()) && (!asyncIn.isClosing())) { + final Buffer byteBufferMsg = (Buffer) msg; if (!pending.isEmpty()) { - queueRequest(ByteBufferMsg); + queueRequest(byteBufferMsg); return; } - writeWithPendingDetection(ByteBufferMsg, false); + writeWithPendingDetection(byteBufferMsg, false); } } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/b0b8d346/sshd-core/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpTest.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpTest.java b/sshd-core/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpTest.java index eaa18b9..96dc836 100644 --- a/sshd-core/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpTest.java +++ b/sshd-core/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpTest.java @@ -83,6 +83,9 @@ import org.junit.runners.MethodSorters; import com.jcraft.jsch.ChannelSftp; import com.jcraft.jsch.JSch; +/** + * @author <a href="mailto:[email protected]">Apache MINA SSHD Project</a> + */ @FixMethodOrder(MethodSorters.NAME_ASCENDING) public class SftpTest extends AbstractSftpClientTestSupport {
