Repository: mina-sshd
Updated Branches:
  refs/heads/master 6068e0b0b -> 340803bb3


[SSHD-419] Add a configuration entry to automatically set the timeout on 
ChannelPipedInputStream


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/340803bb
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/340803bb
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/340803bb

Branch: refs/heads/master
Commit: 340803bb378e45612071be9895bb4d3aeb1cb474
Parents: 6068e0b
Author: Lyor Goldstein <[email protected]>
Authored: Sun May 31 12:48:53 2015 +0300
Committer: Lyor Goldstein <[email protected]>
Committed: Sun May 31 12:48:53 2015 +0300

----------------------------------------------------------------------
 .../client/channel/AbstractClientChannel.java   |  2 +-
 .../org/apache/sshd/common/FactoryManager.java  |  8 ++++
 .../sshd/common/channel/AbstractChannel.java    |  4 +-
 .../common/channel/ChannelPipedInputStream.java | 25 +++++++----
 .../org/apache/sshd/common/channel/Window.java  | 45 +++++++++++++++++---
 .../common/channel/WindowClosedException.java   |  4 +-
 .../server/channel/AbstractServerChannel.java   |  2 +-
 7 files changed, 67 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/340803bb/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
 
b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
index 7481884..1f434b0 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
@@ -252,7 +252,7 @@ public abstract class AbstractClientChannel extends 
AbstractChannel implements C
     @Override
     public void handleOpenSuccess(int recipient, int rwsize, int rmpsize, 
Buffer buffer) {
         this.recipient = recipient;
-        this.remoteWindow.init(rwsize, rmpsize);
+        this.remoteWindow.init(rwsize, rmpsize, 
session.getFactoryManager().getProperties());
         try {
             doOpen();
             this.opened.set(true);

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/340803bb/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java 
b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
index 2b70cc6..f8b5508 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
@@ -44,6 +44,14 @@ public interface FactoryManager {
     public static final String WINDOW_SIZE = "window-size";
 
     /**
+     * Key used to retrieve timeout (msec.) to wait for data to
+     * become available when reading from a channel. If not set
+     * or non-positive then infinite value is assumed
+     */
+    public static final String WINDOW_TIMEOUT = "window-timeout";
+
+
+    /**
      * Key used to retrieve the value of the maximum packet size
      * in the configuration properties map.
      */

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/340803bb/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
index c539f28..6387cac 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
@@ -313,9 +313,7 @@ public abstract class AbstractChannel extends 
CloseableUtils.AbstractInnerClosea
     }
 
     protected void configureWindow() {
-        int window = session.getIntProperty(FactoryManager.WINDOW_SIZE, 
DEFAULT_WINDOW_SIZE);
-        int packet = session.getIntProperty(FactoryManager.MAX_PACKET_SIZE, 
DEFAULT_PACKET_SIZE);
-        localWindow.init(window, packet);
+        localWindow.init(session);
     }
 
     protected void sendWindowAdjust(int len) throws IOException {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/340803bb/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java
index f9fd7e1..44a4d8d 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java
@@ -27,6 +27,10 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.sshd.common.FactoryManager;
+import org.apache.sshd.common.FactoryManagerUtils;
+import org.apache.sshd.common.util.GenericUtils;
+import org.apache.sshd.common.util.ValidateUtils;
 import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
 
@@ -36,6 +40,7 @@ import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
  * @author <a href="mailto:[email protected]";>Apache MINA SSHD Project</a>
  */
 public class ChannelPipedInputStream extends InputStream implements 
ChannelPipedSink {
+    public static final long DEFAULT_TIMEOUT = 0;    // infinite
 
     private final Window localWindow;
     private final Buffer buffer = new ByteArrayBuffer();
@@ -46,7 +51,7 @@ public class ChannelPipedInputStream extends InputStream 
implements ChannelPiped
     private final Lock lock = new ReentrantLock();
     private final Condition dataAvailable = lock.newCondition();
 
-    private int timeout = 0; // zero is infinite
+    private long timeout;
 
     /**
      * {@link ChannelPipedOutputStream} is already closed and so we will not 
receive additional data.
@@ -56,14 +61,15 @@ public class ChannelPipedInputStream extends InputStream 
implements ChannelPiped
     private boolean writerClosed;
 
     public ChannelPipedInputStream(Window localWindow) {
-        this.localWindow = localWindow;
+        this.localWindow = ValidateUtils.checkNotNull(localWindow, "No local 
window provided", GenericUtils.EMPTY_OBJECT_ARRAY);
+        this.timeout = 
FactoryManagerUtils.getLongProperty(localWindow.getProperties(), 
FactoryManager.WINDOW_TIMEOUT, DEFAULT_TIMEOUT);
     }
 
-    public void setTimeout(int timeout) {
+    public void setTimeout(long timeout) {
         this.timeout = timeout;
     }
 
-    public int getTimeout() {
+    public long getTimeout() {
         return timeout;
     }
 
@@ -97,9 +103,9 @@ public class ChannelPipedInputStream extends InputStream 
implements ChannelPiped
         long startTime = System.currentTimeMillis();
         lock.lock();
         try {
-            for (;;) {
+            for (int index=0;; index++) {
                 if ((closed && writerClosed && eofSent) || (closed && 
!writerClosed)) {
-                    throw new IOException("Pipe closed");
+                    throw new IOException("Pipe closed after " + index + " 
cycles");
                 }
                 if (buffer.available() > 0) {
                     break;
@@ -108,25 +114,26 @@ public class ChannelPipedInputStream extends InputStream 
implements ChannelPiped
                     eofSent = true;
                     return -1; // no more data to read
                 }
+
                 try {
                     if (timeout > 0) {
                         long remaining = timeout - (System.currentTimeMillis() 
- startTime);
                         if (remaining <= 0) {
-                            throw new SocketException("timeout");
+                            throw new SocketException("Timeout (" + timeout + 
") exceeded after " + index + " cycles");
                         }
                         dataAvailable.await(remaining, TimeUnit.MILLISECONDS);
                     } else {
                         dataAvailable.await();
                     }
                 } catch (InterruptedException e) {
-                    throw (IOException) new 
InterruptedIOException("Interrupted while waiting for data to become 
available").initCause(e);
+                    throw (IOException) new 
InterruptedIOException("Interrupted at cycle #" + index + " while waiting for 
data to become available").initCause(e);
                 }
             }
             if (len > buffer.available()) {
                 len = buffer.available();
             }
             buffer.getRawBytes(b, off, len);
-            if (buffer.rpos() > localWindow.getPacketSize() || 
buffer.available() == 0) {
+            if ((buffer.rpos() > localWindow.getPacketSize()) || 
(buffer.available() == 0)) {
                 buffer.compact();
             }
         } finally {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/340803bb/sshd-core/src/main/java/org/apache/sshd/common/channel/Window.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/Window.java 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/Window.java
index ba371e4..f166636 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/Window.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/Window.java
@@ -19,8 +19,15 @@
 package org.apache.sshd.common.channel;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
 
+import org.apache.sshd.common.FactoryManager;
+import org.apache.sshd.common.FactoryManagerUtils;
+import org.apache.sshd.common.Session;
 import org.apache.sshd.common.util.AbstractLoggingBean;
+import org.apache.sshd.common.util.GenericUtils;
+import org.apache.sshd.common.util.ValidateUtils;
 
 /**
  * A Window for a given channel.
@@ -41,11 +48,16 @@ public class Window extends AbstractLoggingBean {
     private int packetSize;
     private boolean waiting;
     private boolean closed;
+    private Map<String,?> props = Collections.<String,Object>emptyMap();
 
     public Window(AbstractChannel channel, Object lock, boolean client, 
boolean local) {
-        this.channel = channel;
-        this.lock = lock != null ? lock : this;
-        this.name = (client ? "client" : "server") + " " + (local ? "local " : 
"remote") + " window";
+        this.channel = ValidateUtils.checkNotNull(channel, "No channel 
provided", GenericUtils.EMPTY_OBJECT_ARRAY);
+        this.lock = (lock != null) ? lock : this;
+        this.name = String.valueOf(channel) + ": " + (client ? "client" : 
"server") + " " + (local ? "local " : "remote") + " window";
+    }
+
+    public Map<String,?> getProperties() {
+        return props;
     }
 
     public int getSize() {
@@ -62,11 +74,26 @@ public class Window extends AbstractLoggingBean {
         return packetSize;
     }
 
-    public void init(int size, int packetSize) {
+    public void init(Session session) {
+        init(session.getFactoryManager());
+    }
+    
+    public void init(FactoryManager manager) {
+        init(manager.getProperties());
+    }
+
+    public void init(Map<String,?> props) {
+        init(FactoryManagerUtils.getIntProperty(props, 
FactoryManager.WINDOW_SIZE, AbstractChannel.DEFAULT_WINDOW_SIZE),
+             FactoryManagerUtils.getIntProperty(props, 
FactoryManager.MAX_PACKET_SIZE, AbstractChannel.DEFAULT_PACKET_SIZE),
+             props);
+    }
+
+    public void init(int size, int packetSize, Map<String,?> props) {
         synchronized (lock) {
             this.size = size;
             this.maxSize = size;
             this.packetSize = packetSize;
+            this.props = props;
             lock.notifyAll();
         }
     }
@@ -91,7 +118,6 @@ public class Window extends AbstractLoggingBean {
         }
     }
 
-
     public void consumeAndCheck(int len) throws IOException {
         synchronized (lock) {
             //assert size > len;
@@ -133,7 +159,7 @@ public class Window extends AbstractLoggingBean {
                 waiting = false;
             }
             if (closed) {
-                throw new WindowClosedException();
+                throw new WindowClosedException(name);
             }
             size -= len;
             if (log.isTraceEnabled()) {
@@ -158,7 +184,7 @@ public class Window extends AbstractLoggingBean {
                 waiting = false;
             }
             if (closed) {
-                throw new WindowClosedException();
+                throw new WindowClosedException(name);
             }
             return size;
         }
@@ -172,4 +198,9 @@ public class Window extends AbstractLoggingBean {
             }
         }
     }
+
+    @Override
+    public String toString() {
+        return name;
+    }
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/340803bb/sshd-core/src/main/java/org/apache/sshd/common/channel/WindowClosedException.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/WindowClosedException.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/WindowClosedException.java
index b672f50..039a72d 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/WindowClosedException.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/WindowClosedException.java
@@ -28,7 +28,7 @@ import org.apache.sshd.common.SshException;
 public class WindowClosedException extends SshException {
     private static final long serialVersionUID = -5345787686165334234L;
 
-    public WindowClosedException() {
-        super("Already closed");
+    public WindowClosedException(String name) {
+        super("Already closed: " + name);
     }
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/340803bb/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java
 
b/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java
index 43c9f74..8fc39ec 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java
@@ -46,7 +46,7 @@ public abstract class AbstractServerChannel extends 
AbstractChannel {
     @Override
     public OpenFuture open(int recipient, int rwsize, int rmpsize, Buffer 
buffer) {
         this.recipient = recipient;
-        this.remoteWindow.init(rwsize, rmpsize);
+        this.remoteWindow.init(rwsize, rmpsize, 
session.getFactoryManager().getProperties());
         configureWindow();
         return doInit(buffer);
     }

Reply via email to