Repository: mina-sshd
Updated Branches:
  refs/heads/master a9d975b6d -> b0b8d3466


[SSHD-569] Reset session idle timeout while waiting for channel window 
availability


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/b0b8d346
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/b0b8d346
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/b0b8d346

Branch: refs/heads/master
Commit: b0b8d34664618d561187996639299eed04867ebf
Parents: a9d975b
Author: Lyor Goldstein <[email protected]>
Authored: Mon Oct 19 07:44:46 2015 +0300
Committer: Lyor Goldstein <[email protected]>
Committed: Mon Oct 19 07:44:46 2015 +0300

----------------------------------------------------------------------
 .../org/apache/sshd/common/FactoryManager.java  |   2 +-
 .../common/channel/ChannelOutputStream.java     |  64 +++++++---
 .../common/channel/ChannelPipedInputStream.java |   1 +
 .../java/org/apache/sshd/WindowAdjustTest.java  | 123 +++++++++++++------
 .../sshd/client/subsystem/sftp/SftpTest.java    |   3 +
 5 files changed, 140 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/b0b8d346/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java 
b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
index dc0ff3d..14f688c 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
@@ -111,7 +111,7 @@ public interface FactoryManager extends 
SessionListenerManager, ChannelListenerM
     /**
      * Key used to retrieve the value of idle timeout after which
      * it will close the connection - in milliseconds.
-     * @see #DEFAULT_AUTH_TIMEOUT
+     * @see #DEFAULT_IDLE_TIMEOUT
      */
     String IDLE_TIMEOUT = "idle-timeout";
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/b0b8d346/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
index 0ba9c39..4f17882 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
@@ -23,9 +23,12 @@ import java.io.InterruptedIOException;
 import java.io.OutputStream;
 import java.nio.channels.Channel;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.sshd.common.PropertyResolverUtils;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.SshException;
+import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.util.ValidateUtils;
 import org.apache.sshd.common.util.buffer.Buffer;
 import org.slf4j.Logger;
@@ -48,8 +51,8 @@ public class ChannelOutputStream extends OutputStream 
implements Channel {
     private final Logger log;
     private final byte cmd;
     private final byte[] b = new byte[1];
+    private final AtomicBoolean closedState = new AtomicBoolean(false);
     private Buffer buffer;
-    private boolean closed;
     private int bufferLength;
     private int lastSize;
     private boolean noDelay;
@@ -78,7 +81,7 @@ public class ChannelOutputStream extends OutputStream 
implements Channel {
 
     @Override
     public boolean isOpen() {
-        return !closed;
+        return !closedState.get();
     }
 
     @Override
@@ -90,9 +93,10 @@ public class ChannelOutputStream extends OutputStream 
implements Channel {
     @Override
     public synchronized void write(byte[] buf, int s, int l) throws 
IOException {
         if (!isOpen()) {
-            throw new SshException("write(len=" + l + ") channel already 
closed");
+            throw new SshException("write(" + this + ") len=" + l + " - 
channel already closed");
         }
 
+        Session session = channel.getSession();
         while (l > 0) {
             // The maximum amount we should admit without flushing again
             // is enough to make up one full packet within our allowed
@@ -100,21 +104,26 @@ public class ChannelOutputStream extends OutputStream 
implements Channel {
             // packet we sent to allow the producer to race ahead and fill
             // out the next packet before we block and wait for space to
             // become available again.
-            //
             int l2 = Math.min(l, Math.min(remoteWindow.getSize() + lastSize, 
remoteWindow.getPacketSize()) - bufferLength);
             if (l2 <= 0) {
                 if (bufferLength > 0) {
                     flush();
                 } else {
+                    session.resetIdleTimeout();
                     try {
                         remoteWindow.waitForSpace(maxWaitTimeout);
                     } catch (WindowClosedException e) {
-                        closed = true;
+                        if (!closedState.getAndSet(true)) {
+                            if (log.isDebugEnabled()) {
+                                log.debug("write({})[len={}] closing due to 
window closed", this, l);
+                            }
+                        }
                         throw e;
                     } catch (InterruptedException e) {
-                        throw (IOException) new 
InterruptedIOException("Interrupted while waiting for remote 
space").initCause(e);
+                        throw (IOException) new 
InterruptedIOException("Interrupted while waiting for remote space on write 
len=" + l + " to " + this).initCause(e);
                     }
                 }
+                session.resetIdleTimeout();
                 continue;
             }
             buffer.putRawBytes(buf, s, l2);
@@ -122,19 +131,25 @@ public class ChannelOutputStream extends OutputStream 
implements Channel {
             s += l2;
             l -= l2;
         }
-        if (noDelay) {
+
+        if (isNoDelay()) {
             flush();
+        } else {
+            session.resetIdleTimeout();
         }
     }
 
     @Override
     public synchronized void flush() throws IOException {
         if (!isOpen()) {
-            throw new SshException("flush(length=" + bufferLength + ") - 
stream is already closed");
+            throw new SshException("flush(" + this + ") length=" + 
bufferLength + " - stream is already closed");
         }
 
         try {
+            Session session = channel.getSession();
             while (bufferLength > 0) {
+                session.resetIdleTimeout();
+
                 Buffer buf = buffer;
                 int total = bufferLength;
                 int available = remoteWindow.waitForSpace(maxWaitTimeout);
@@ -152,16 +167,22 @@ public class ChannelOutputStream extends OutputStream 
implements Channel {
                     bufferLength = leftover;
                 }
                 lastSize = length;
+
+                session.resetIdleTimeout();
                 remoteWindow.waitAndConsume(length, maxWaitTimeout);
-                if (log.isDebugEnabled()) {
-                    log.debug("Send {} on channel {}",
-                            (cmd == SshConstants.SSH_MSG_CHANNEL_DATA) ? 
"SSH_MSG_CHANNEL_DATA" : "SSH_MSG_CHANNEL_EXTENDED_DATA",
-                            Integer.valueOf(channel.getId()));
+                if (log.isTraceEnabled()) {
+                    log.trace("Send {} on channel {}",
+                              (cmd == SshConstants.SSH_MSG_CHANNEL_DATA) ? 
"SSH_MSG_CHANNEL_DATA" : "SSH_MSG_CHANNEL_EXTENDED_DATA",
+                              channel);
                 }
                 channel.writePacket(buf);
             }
         } catch (WindowClosedException e) {
-            closed = true;
+            if (!closedState.getAndSet(true)) {
+                if (log.isDebugEnabled()) {
+                    log.debug("flush({}) closing due to window closed", this);
+                }
+            }
             throw e;
         } catch (Exception e) {
             if (e instanceof IOException) {
@@ -175,16 +196,26 @@ public class ChannelOutputStream extends OutputStream 
implements Channel {
     @Override
     public synchronized void close() throws IOException {
         if (isOpen()) {
+            if (log.isTraceEnabled()) {
+                log.trace("close({}) closing", this);
+            }
+
             try {
                 flush();
             } finally {
-                closed = true;
+                closedState.set(true);
             }
         }
     }
 
-    private void newBuffer(int size) {
-        buffer = channel.getSession().createBuffer(cmd, size <= 0 ? 0 : 12 + 
size);
+    @Override
+    public String toString() {
+        return getClass().getSimpleName() + "[" + channel + "]";
+    }
+
+    protected void newBuffer(int size) {
+        Session session = channel.getSession();
+        buffer = session.createBuffer(cmd, size <= 0 ? 12 : 12 + size);
         buffer.putInt(channel.getRecipient());
         if (cmd == SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA) {
             buffer.putInt(1);
@@ -192,5 +223,4 @@ public class ChannelOutputStream extends OutputStream 
implements Channel {
         buffer.putInt(0);
         bufferLength = 0;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/b0b8d346/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java
index 61c5ec9..c9bcb32 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java
@@ -139,6 +139,7 @@ public class ChannelPipedInputStream extends InputStream 
implements ChannelPiped
                     throw (IOException) new 
InterruptedIOException("Interrupted at cycle #" + index + " while waiting for 
data to become available").initCause(e);
                 }
             }
+
             if (len > buffer.available()) {
                 len = buffer.available();
             }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/b0b8d346/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java 
b/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java
index 22c2533..0380e56 100644
--- a/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java
@@ -21,14 +21,17 @@ package org.apache.sshd;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.Collection;
 import java.util.Deque;
 import java.util.EnumSet;
 import java.util.LinkedList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.sshd.client.SshClient;
 import org.apache.sshd.client.channel.ClientChannel;
@@ -42,6 +45,8 @@ import org.apache.sshd.common.io.WritePendingException;
 import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
 import org.apache.sshd.common.util.io.NoCloseOutputStream;
+import org.apache.sshd.common.util.logging.AbstractLoggingBean;
+import org.apache.sshd.common.util.threads.ThreadUtils;
 import org.apache.sshd.server.AsyncCommand;
 import org.apache.sshd.server.Command;
 import org.apache.sshd.server.Environment;
@@ -54,6 +59,8 @@ import org.junit.Before;
 import org.junit.FixMethodOrder;
 import org.junit.Test;
 import org.junit.runners.MethodSorters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * <p>
@@ -66,7 +73,8 @@ import org.junit.runners.MethodSorters;
 @FixMethodOrder(MethodSorters.NAME_ASCENDING)
 public class WindowAdjustTest extends BaseTestSupport {
 
-    public static final String END_FILE = "#";
+    public static final byte END_FILE = '#';
+    public static final int BIG_MSG_SEND_COUNT = 10000;
 
     private SshServer sshServer;
     private int port;
@@ -78,13 +86,13 @@ public class WindowAdjustTest extends BaseTestSupport {
     @Before
     public void setUp() throws Exception {
         sshServer = setupTestServer();
+
         final byte[] msg = Files.readAllBytes(
                 Paths.get(getClass().getResource("/big-msg.txt").toURI()));
-
         sshServer.setShellFactory(new Factory<Command>() {
             @Override
             public Command create() {
-                return new FloodingAsyncCommand(msg, 10000, END_FILE);
+                return new FloodingAsyncCommand(msg, BIG_MSG_SEND_COUNT, 
END_FILE);
             }
         });
 
@@ -101,7 +109,7 @@ public class WindowAdjustTest extends BaseTestSupport {
         }
     }
 
-    @Test(timeout = 5L * 60L * 1000L)
+    @Test(timeout = 6L * 60L * 1000L)
     public void testTrafficHeavyLoad() throws Exception {
 
         try (SshClient client = setupTestClient()) {
@@ -114,10 +122,10 @@ public class WindowAdjustTest extends BaseTestSupport {
                 try (final ClientChannel channel = 
session.createShellChannel()) {
                     channel.setOut(new VerifyingOutputStream(channel, 
END_FILE));
                     channel.setErr(new NoCloseOutputStream(System.err));
-                    channel.open();
+                    channel.open().verify(15L, TimeUnit.SECONDS);
 
                     Collection<ClientChannel.ClientChannelEvent> result =
-                            
channel.waitFor(EnumSet.of(ClientChannel.ClientChannelEvent.CLOSED), 
TimeUnit.SECONDS.toMillis(15L));
+                            
channel.waitFor(EnumSet.of(ClientChannel.ClientChannelEvent.CLOSED), 
TimeUnit.MINUTES.toMillis(2L));
                     assertFalse("Timeout while waiting for channel closure", 
result.contains(ClientChannel.ClientChannelEvent.TIMEOUT));
                 }
             } finally {
@@ -130,33 +138,61 @@ public class WindowAdjustTest extends BaseTestSupport {
      * Read all incoming data and if END_FILE symbol is detected, kill client 
session to end test
      */
     private static class VerifyingOutputStream extends OutputStream {
-
+        private final Logger log;
         private final ClientChannel channel;
-        private String endFile;
+        private final byte eofSignal;
 
-        public VerifyingOutputStream(ClientChannel channel, final String 
lastMsg) {
+        public VerifyingOutputStream(ClientChannel channel, final byte 
eofSignal) {
+            this.log = LoggerFactory.getLogger(getClass());
             this.channel = channel;
-            this.endFile = lastMsg;
+            this.eofSignal = eofSignal;
         }
 
         @Override
         public void write(int b) throws IOException {
-            if (String.valueOf((char) b).equals(endFile)) {
+            if (channel.isClosed() || channel.isClosing()) {
+                throw new IOException("Channel (" + channel + ") is closing / 
closed on write single byte");
+            }
+
+            if (b == (eofSignal & 0xff)) {
+                log.info("Closing channel (" + channel + ") due to single byte 
EOF");
+                channel.close(true);
+            }
+        }
+
+        @Override
+        public void write(byte[] b, int off, int len) throws IOException {
+            if (channel.isClosed() || channel.isClosing()) {
+                throw new IOException("Channel (" + channel + ") is closing / 
closed on write " + len + " bytes");
+            }
+
+            if (len <= 0) {
+                return;
+            }
+
+            int lastPos = off + len - 1;
+            if ((b[lastPos] & 0xFF) == (eofSignal & 0xFF)) {
+                log.info("Closing channel (" + channel + ") due to last byte 
EOF");
                 channel.close(true);
             }
         }
     }
 
-    public static final class FloodingAsyncCommand implements AsyncCommand {
+    public static final class FloodingAsyncCommand extends AbstractLoggingBean 
implements AsyncCommand {
+        private static final AtomicInteger poolCount = new AtomicInteger(0);
 
+        private final AtomicReference<ExecutorService> executorHolder = new 
AtomicReference<>();
+        private final AtomicReference<Future<?>> futureHolder = new 
AtomicReference<Future<?>>();
+
+        private AsyncInPendingWrapper pendingWrapper;
         private byte[] msg;
         private int sendCount;
-        private String lastMsg;
+        private byte eofSignal;
 
-        public FloodingAsyncCommand(final byte[] msg, final int sendCount, 
final String lastMsg) {
+        public FloodingAsyncCommand(final byte[] msg, final int sendCount, 
final byte eofSignal) {
             this.msg = msg;
             this.sendCount = sendCount;
-            this.lastMsg = lastMsg;
+            this.eofSignal = eofSignal;
         }
 
         @Override
@@ -166,18 +202,7 @@ public class WindowAdjustTest extends BaseTestSupport {
 
         @Override
         public void setIoOutputStream(IoOutputStream out) {
-            final AsyncInPendingWrapper a = new AsyncInPendingWrapper(out);
-
-            new Thread(new Runnable() {
-                @SuppressWarnings("synthetic-access")
-                @Override
-                public void run() {
-                    for (int i = 0; i < sendCount; i++) {
-                        a.write(new ByteArrayBuffer(msg));
-                    }
-                    a.write(new 
ByteArrayBuffer((lastMsg.getBytes(StandardCharsets.UTF_8))));
-                }
-            }).start();
+            pendingWrapper = new AsyncInPendingWrapper(out);
         }
 
         @Override
@@ -207,12 +232,41 @@ public class WindowAdjustTest extends BaseTestSupport {
 
         @Override
         public void start(Environment env) throws IOException {
-            // ignored
+            log.info("Starting");
+
+            ExecutorService service = 
ThreadUtils.newSingleThreadExecutor(getClass().getSimpleName() + "-" + 
poolCount.incrementAndGet());
+            executorHolder.set(service);
+
+            futureHolder.set(service.submit(new Runnable() {
+                @SuppressWarnings("synthetic-access")
+                @Override
+                public void run() {
+                    log.info("Start heavy load sending " + sendCount + " 
messages of " + msg.length + " bytes");
+                    for (int i = 0; i < sendCount; i++) {
+                        pendingWrapper.write(new ByteArrayBuffer(msg));
+                    }
+                    log.info("Sending EOF signal");
+                    pendingWrapper.write(new ByteArrayBuffer(new 
byte[]{eofSignal}));
+                }
+            }));
+            log.info("Started");
         }
 
         @Override
         public void destroy() {
-            // ignored
+            log.info("Destroying");
+
+            Future<?> future = futureHolder.getAndSet(null);
+            if ((future != null) && (!future.isDone())) {
+                log.info("Cancelling");
+                future.cancel(true);
+            }
+
+            ExecutorService service = executorHolder.getAndSet(null);
+            if ((service != null) && (!service.isShutdown())) {
+                log.info("Shutdown");
+                service.shutdownNow();
+            }
         }
     }
 
@@ -238,15 +292,14 @@ public class WindowAdjustTest extends BaseTestSupport {
         }
 
         public synchronized void write(final Object msg) {
-            if (asyncIn != null && !asyncIn.isClosed() && 
!asyncIn.isClosing()) {
-
-                final Buffer ByteBufferMsg = (Buffer) msg;
+            if ((asyncIn != null) && (!asyncIn.isClosed()) && 
(!asyncIn.isClosing())) {
+                final Buffer byteBufferMsg = (Buffer) msg;
                 if (!pending.isEmpty()) {
-                    queueRequest(ByteBufferMsg);
+                    queueRequest(byteBufferMsg);
                     return;
                 }
 
-                writeWithPendingDetection(ByteBufferMsg, false);
+                writeWithPendingDetection(byteBufferMsg, false);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/b0b8d346/sshd-core/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpTest.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpTest.java 
b/sshd-core/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpTest.java
index eaa18b9..96dc836 100644
--- 
a/sshd-core/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpTest.java
+++ 
b/sshd-core/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpTest.java
@@ -83,6 +83,9 @@ import org.junit.runners.MethodSorters;
 import com.jcraft.jsch.ChannelSftp;
 import com.jcraft.jsch.JSch;
 
+/**
+ * @author <a href="mailto:[email protected]";>Apache MINA SSHD Project</a>
+ */
 @FixMethodOrder(MethodSorters.NAME_ASCENDING)
 public class SftpTest extends AbstractSftpClientTestSupport {
 

Reply via email to