Repository: mina-sshd
Updated Branches:
  refs/heads/master 3243c91df -> 4b19c2a53


[SSHD-565] Require a timeout value on Window's waitForSpace and waitAndConsume 
methods


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/4b19c2a5
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/4b19c2a5
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/4b19c2a5

Branch: refs/heads/master
Commit: 4b19c2a5310aa5c7120395ae015922ce1cbf256e
Parents: 3243c91
Author: Lyor Goldstein <[email protected]>
Authored: Mon Sep 21 08:17:26 2015 +0300
Committer: Lyor Goldstein <[email protected]>
Committed: Mon Sep 21 08:17:26 2015 +0300

----------------------------------------------------------------------
 .../apache/sshd/common/FactoryManagerUtils.java |  75 +++++++++-
 .../common/channel/ChannelOutputStream.java     |  29 +++-
 .../org/apache/sshd/common/channel/Window.java  | 138 +++++++++++-------
 .../org/apache/sshd/common/util/Predicate.java  |  28 ++++
 .../sshd/common/channel/WindowTimeoutTest.java  | 142 +++++++++++++++++++
 .../git/transport/GitSshdSessionFactory.java    |   2 +-
 6 files changed, 346 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/4b19c2a5/sshd-core/src/main/java/org/apache/sshd/common/FactoryManagerUtils.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManagerUtils.java 
b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManagerUtils.java
index 680367a..d62c4c1 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManagerUtils.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManagerUtils.java
@@ -22,6 +22,7 @@ package org.apache.sshd.common;
 import java.util.Map;
 import java.util.Objects;
 
+import org.apache.sshd.common.channel.Channel;
 import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.util.GenericUtils;
 
@@ -35,6 +36,18 @@ public final class FactoryManagerUtils {
     }
 
     /**
+     * @param channel      The {@link Channel} instance
+     * @param name         The property name
+     * @param defaultValue The default value to return if the specified 
property
+     *                     does not exist in the properties map
+     * @return The resolved property
+     * @throws NumberFormatException if malformed value
+     */
+    public static long getLongProperty(Channel channel, String name, long 
defaultValue) {
+        return getLongProperty(channel.getSession(), name, defaultValue);
+    }
+
+    /**
      * @param session      The {@link Session} instance
      * @param name         The property name
      * @param defaultValue The default value to return if the specified 
property
@@ -78,6 +91,16 @@ public final class FactoryManagerUtils {
     }
 
     /**
+     * @param channel The {@link Channel} instance
+     * @param name    The property name
+     * @return The {@link Long} value or {@code null} if property not found or 
empty string
+     * @throws NumberFormatException if malformed value
+     */
+    public static Long getLong(Channel channel, String name) {
+        return getLong(channel.getSession(), name);
+    }
+
+    /**
      * @param session The {@link Session} instance
      * @param name    The property name
      * @return The {@link Long} value or {@code null} if property not found or 
empty string
@@ -115,8 +138,12 @@ public final class FactoryManagerUtils {
         }
     }
 
+    public static Object updateProperty(Channel channel, String name, long 
value) {
+        return updateProperty(channel.getSession(), name, value);
+    }
+
     public static Object updateProperty(Session session, String name, long 
value) {
-        return updateProperty(session, name, Long.toString(value));
+        return updateProperty(session.getFactoryManager(), name, value);
     }
 
     public static Object updateProperty(FactoryManager manager, String name, 
long value) {
@@ -127,6 +154,10 @@ public final class FactoryManagerUtils {
         return updateProperty(props, name, Long.valueOf(value));
     }
 
+    public static int getIntProperty(Channel channel, String name, int 
defaultValue) {
+        return getIntProperty(channel.getSession(), name, defaultValue);
+    }
+
     public static int getIntProperty(Session session, String name, int 
defaultValue) {
         return getIntProperty(session.getFactoryManager(), name, defaultValue);
     }
@@ -146,6 +177,10 @@ public final class FactoryManagerUtils {
         }
     }
 
+    public static Integer getInteger(Channel channel, String name) {
+        return getInteger(channel.getSession(), name);
+    }
+
     public static Integer getInteger(Session session, String name) {
         return getInteger(session.getFactoryManager(), name);
     }
@@ -165,6 +200,10 @@ public final class FactoryManagerUtils {
         }
     }
 
+    public static Object updateProperty(Channel channel, String name, int 
value) {
+        return updateProperty(channel.getSession(), name, value);
+    }
+
     public static Object updateProperty(Session session, String name, int 
value) {
         return updateProperty(session.getFactoryManager(), name, value);
     }
@@ -177,6 +216,10 @@ public final class FactoryManagerUtils {
         return updateProperty(props, name, Integer.valueOf(value));
     }
 
+    public static boolean getBooleanProperty(Channel channel, String name, 
boolean defaultValue) {
+        return getBooleanProperty(channel.getSession(), name, defaultValue);
+    }
+
     public static boolean getBooleanProperty(Session session, String name, 
boolean defaultValue) {
         return getBooleanProperty(session.getFactoryManager(), name, 
defaultValue);
     }
@@ -194,6 +237,10 @@ public final class FactoryManagerUtils {
         }
     }
 
+    public static Boolean getBoolean(Channel channel, String name) {
+        return getBoolean(channel.getSession(), name);
+    }
+
     public static Boolean getBoolean(Session session, String name) {
         return getBoolean(session.getFactoryManager(), name);
     }
@@ -213,6 +260,10 @@ public final class FactoryManagerUtils {
         }
     }
 
+    public static Object updateProperty(Channel channel, String name, boolean 
value) {
+        return updateProperty(channel.getSession(), name, value);
+    }
+
     public static Object updateProperty(Session session, String name, boolean 
value) {
         return updateProperty(session.getFactoryManager(), name, value);
     }
@@ -225,16 +276,24 @@ public final class FactoryManagerUtils {
         return updateProperty(props, name, Boolean.valueOf(value));
     }
 
-    public static String getString(Session session, String name) {
-        return getStringProperty(session, name, null);
+    public static String getString(Channel channel, String name) {
+        return getString(channel.getSession(), name);
     }
 
-    public static String getStringProperty(Session session, String name, 
String defaultValue) {
-        return getStringProperty(session.getFactoryManager(), name, 
defaultValue);
+    public static String getString(Session session, String name) {
+        return getString(session.getFactoryManager(), name);
     }
 
     public static String getString(FactoryManager manager, String name) {
-        return getStringProperty(manager, name, null);
+        return getString(manager.getProperties(), name);
+    }
+
+    public static String getStringProperty(Channel channel, String name, 
String defaultValue) {
+        return getStringProperty(channel.getSession(), name, defaultValue);
+    }
+
+    public static String getStringProperty(Session session, String name, 
String defaultValue) {
+        return getStringProperty(session.getFactoryManager(), name, 
defaultValue);
     }
 
     public static String getStringProperty(FactoryManager manager, String 
name, String defaultValue) {
@@ -255,6 +314,10 @@ public final class FactoryManagerUtils {
         }
     }
 
+    public static Object updateProperty(Channel channel, String name, Object 
value) {
+        return updateProperty(channel.getSession(), name, value);
+    }
+
     public static Object updateProperty(Session session, String name, Object 
value) {
         return updateProperty(session.getFactoryManager(), name, value);
     }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/4b19c2a5/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
index ce2d2ef..a2c4ace 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
@@ -22,9 +22,11 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.io.OutputStream;
 import java.nio.channels.Channel;
-
+import java.util.concurrent.TimeUnit;
+import org.apache.sshd.common.FactoryManagerUtils;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.SshException;
+import org.apache.sshd.common.util.ValidateUtils;
 import org.apache.sshd.common.util.buffer.Buffer;
 import org.slf4j.Logger;
 
@@ -34,9 +36,15 @@ import org.slf4j.Logger;
  * @author <a href="mailto:[email protected]";>Apache MINA SSHD Project</a>
  */
 public class ChannelOutputStream extends OutputStream implements Channel {
+    /**
+     * Configure max. wait time (millis) to wait for space to become available
+     */
+    public static final String WAIT_FOR_SPACE_TIMEOUT = 
"channel-output-wait-for-space-timeout";
+    public static final long DEFAULT_WAIT_FOR_SPACE_TIMEOUT = 
TimeUnit.SECONDS.toMillis(30L);
 
     private final AbstractChannel channel;
     private final Window remoteWindow;
+    private final long maxWaitTimeout;
     private final Logger log;
     private final byte cmd;
     private final byte[] b = new byte[1];
@@ -47,9 +55,15 @@ public class ChannelOutputStream extends OutputStream 
implements Channel {
     private boolean noDelay;
 
     public ChannelOutputStream(AbstractChannel channel, Window remoteWindow, 
Logger log, byte cmd) {
-        this.channel = channel;
-        this.remoteWindow = remoteWindow;
-        this.log = log;
+        this(channel, remoteWindow, 
FactoryManagerUtils.getLongProperty(channel, WAIT_FOR_SPACE_TIMEOUT, 
DEFAULT_WAIT_FOR_SPACE_TIMEOUT), log, cmd);
+    }
+
+    public ChannelOutputStream(AbstractChannel channel, Window remoteWindow, 
long maxWaitTimeout, Logger log, byte cmd) {
+        this.channel = ValidateUtils.checkNotNull(channel, "No channel");
+        this.remoteWindow = ValidateUtils.checkNotNull(remoteWindow, "No 
remote window");
+        ValidateUtils.checkTrue(maxWaitTimeout > 0L, "Non-positive max. wait 
time: %d", maxWaitTimeout);
+        this.maxWaitTimeout = maxWaitTimeout;
+        this.log = ValidateUtils.checkNotNull(log, "No logger");
         this.cmd = cmd;
         newBuffer(0);
     }
@@ -93,7 +107,7 @@ public class ChannelOutputStream extends OutputStream 
implements Channel {
                     flush();
                 } else {
                     try {
-                        remoteWindow.waitForSpace();
+                        remoteWindow.waitForSpace(maxWaitTimeout);
                     } catch (WindowClosedException e) {
                         closed = true;
                         throw e;
@@ -123,7 +137,8 @@ public class ChannelOutputStream extends OutputStream 
implements Channel {
             while (bufferLength > 0) {
                 Buffer buf = buffer;
                 int total = bufferLength;
-                int length = Math.min(Math.min(remoteWindow.waitForSpace(), 
total), remoteWindow.getPacketSize());
+                int available = remoteWindow.waitForSpace(maxWaitTimeout);
+                int length = Math.min(Math.min(available, total), 
remoteWindow.getPacketSize());
                 int pos = buf.wpos();
                 buf.wpos((cmd == SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA) ? 
14 : 10);
                 buf.putInt(length);
@@ -137,7 +152,7 @@ public class ChannelOutputStream extends OutputStream 
implements Channel {
                     bufferLength = leftover;
                 }
                 lastSize = length;
-                remoteWindow.waitAndConsume(length);
+                remoteWindow.waitAndConsume(length, maxWaitTimeout);
                 if (log.isDebugEnabled()) {
                     log.debug("Send {} on channel {}",
                             (cmd == SshConstants.SSH_MSG_CHANNEL_DATA) ? 
"SSH_MSG_CHANNEL_DATA" : "SSH_MSG_CHANNEL_EXTENDED_DATA",

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/4b19c2a5/sshd-core/src/main/java/org/apache/sshd/common/channel/Window.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/Window.java 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/Window.java
index bf3ab51..0094b9d 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/Window.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/Window.java
@@ -20,14 +20,17 @@ package org.apache.sshd.common.channel;
 
 import java.io.IOException;
 import java.io.StreamCorruptedException;
+import java.net.SocketTimeoutException;
 import java.util.Collections;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.sshd.common.FactoryManager;
 import org.apache.sshd.common.FactoryManagerUtils;
 import org.apache.sshd.common.session.Session;
+import org.apache.sshd.common.util.Predicate;
 import org.apache.sshd.common.util.ValidateUtils;
 import org.apache.sshd.common.util.logging.AbstractLoggingBean;
 
@@ -41,7 +44,18 @@ import 
org.apache.sshd.common.util.logging.AbstractLoggingBean;
  * @author <a href="mailto:[email protected]";>Apache MINA SSHD Project</a>
  */
 public class Window extends AbstractLoggingBean implements 
java.nio.channels.Channel {
-    private final AtomicInteger waitingCount = new AtomicInteger(0);
+    /**
+     * Default {@link Predicate} used to test if space became available
+     */
+    public static final Predicate<Window> SPACE_AVAILABLE_PREDICATE = new 
Predicate<Window>() {
+        @SuppressWarnings("synthetic-access")
+        @Override
+        public boolean evaluate(Window input) {
+            // NOTE: we do not call "getSize()" on purpose in order to avoid 
the lock
+            return input.sizeHolder.get() > 0;
+        }
+    };
+
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final AtomicBoolean initialized = new AtomicBoolean(false);
     private final AtomicInteger sizeHolder = new AtomicInteger(0);
@@ -196,34 +210,30 @@ public class Window extends AbstractLoggingBean 
implements java.nio.channels.Cha
         }
     }
 
-    public void waitAndConsume(int len) throws InterruptedException, 
WindowClosedException {
+    /**
+     * Waits for enough data to become available to consume the specified size
+     *
+     * @param len Size of data to consume
+     * @param maxWaitTime ax. time (millis) to wait for enough data to become 
available
+     * @throws InterruptedException If interrupted while waiting
+     * @throws WindowClosedException If window closed while waiting
+     * @throws SocketTimeoutException If timeout expired before enough data 
became available
+     * @see #waitForCondition(Predicate, long)
+     * @see #consume(int)
+     */
+    public void waitAndConsume(final int len, long maxWaitTime) throws 
InterruptedException, WindowClosedException, SocketTimeoutException {
         ValidateUtils.checkTrue(len >= 0, "Negative wait consume length: %d", 
len);
         checkInitialized("waitAndConsume");
 
         synchronized (lock) {
-            while (isOpen() && (sizeHolder.get() < len)) {
-                int waiters = waitingCount.incrementAndGet();
-                if (log.isDebugEnabled()) {
-                    log.debug("waitAndConsume({}) - requested={}, 
available={}, waiters={}", this, len, sizeHolder, waiters);
+            waitForCondition(new Predicate<Window>() {
+                @SuppressWarnings("synthetic-access")
+                @Override
+                public boolean evaluate(Window input) {
+                    // NOTE: we do not call "getSize()" on purpose in order to 
avoid the lock
+                    return input.sizeHolder.get() >= len;
                 }
-
-                long nanoStart = System.nanoTime();
-                try {
-                    lock.wait();
-                } finally {
-                    long nanoEnd = System.nanoTime();
-                    long nanoDuration = nanoEnd - nanoStart;
-                    waiters = waitingCount.decrementAndGet();
-                    if (log.isTraceEnabled()) {
-                        log.debug("waitAndConsume({}) - requested={}, 
available={}, waiters={} - ended after {} nanos",
-                                  this, len, sizeHolder, waiters, 
nanoDuration);
-                    }
-                }
-            }
-
-            if (!isOpen()) {
-                throw new WindowClosedException(toString());
-            }
+            }, maxWaitTime);
 
             if (log.isDebugEnabled()) {
                 log.debug("waitAndConsume({}) - requested={}, available={}", 
this, len, sizeHolder);
@@ -234,44 +244,70 @@ public class Window extends AbstractLoggingBean 
implements java.nio.channels.Cha
     }
 
     /**
-     * Waits (forever) until some data becomes available
+     * Waits until some data becomes available or timeout expires
      *
+     * @param maxWaitTime Max. time (millis) to wait for space to become 
available
      * @return Amount of available data - always positive
      * @throws InterruptedException If interrupted while waiting
      * @throws WindowClosedException If window closed while waiting
+     * @throws SocketTimeoutException If timeout expired before space became 
available
+     * @see #waitForCondition(Predicate, long)
      */
-    public int waitForSpace() throws InterruptedException, 
WindowClosedException {
+    public int waitForSpace(long maxWaitTime) throws InterruptedException, 
WindowClosedException, SocketTimeoutException {
         checkInitialized("waitForSpace");
 
         synchronized (lock) {
-            while (isOpen() && (sizeHolder.get() <= 0)) {
-                int waiters = waitingCount.incrementAndGet();
-                if (log.isDebugEnabled()) {
-                    log.debug("waitForSpace({}) - waiters={}", this, waiters);
-                }
-
-                long nanoStart = System.nanoTime();
-                try {
-                    lock.wait();
-                } finally {
-                    long nanoEnd = System.nanoTime();
-                    long nanoDuration = nanoEnd - nanoStart;
-                    waiters = waitingCount.decrementAndGet();
-                    if (log.isTraceEnabled()) {
-                        log.debug("waitForSpace({}) - waiters={} - ended after 
{} nanos", this, waiters, nanoDuration);
-                    }
-                }
+            waitForCondition(SPACE_AVAILABLE_PREDICATE, maxWaitTime);
+            if (log.isDebugEnabled()) {
+                log.debug("waitForSpace({}) available: {}", this, sizeHolder);
             }
+            return sizeHolder.get();
+        }
+    }
 
-            if (!isOpen()) {
-                throw new WindowClosedException(toString());
+    /**
+     * Waits up to a specified amount of time for a condition to be satisfied 
and
+     * signaled via the lock. <B>Note:</B> assumes that lock is acquired when 
this
+     * method is called.
+     *
+     * @param predicate The {@link Predicate} to check if the condition has 
been
+     * satisfied - the argument to the predicate is {@code this} reference
+     * @param maxWaitTime Max. time (millis) to wait for the condition to be 
satisfied
+     * @throws WindowClosedException If window closed while waiting
+     * @throws InterruptedException If interrupted while waiting
+     * @throws SocketTimeoutException If timeout expired before condition was 
satisfied
+     * @see #isOpen()
+     */
+    protected void waitForCondition(Predicate<? super Window> predicate, long 
maxWaitTime)
+            throws WindowClosedException, InterruptedException, 
SocketTimeoutException {
+        ValidateUtils.checkNotNull(predicate, "No condition");
+        ValidateUtils.checkTrue(maxWaitTime > 0, "Non-positive max. wait time: 
%d", maxWaitTime);
+
+        long maxWaitNanos = TimeUnit.MILLISECONDS.toNanos(maxWaitTime);
+        long remWaitNanos = maxWaitNanos;
+        // The loop takes care of spurious wakeups
+        while (isOpen() && (remWaitNanos > 0L)) {
+            if (predicate.evaluate(this)) {
+                return;
             }
 
-            if (log.isDebugEnabled()) {
-                log.debug("waitForSpace({}) available: {}", this, sizeHolder);
+            long curWaitMillis = TimeUnit.NANOSECONDS.toMillis(remWaitNanos);
+            long nanoWaitStart = System.nanoTime();
+            if (curWaitMillis > 0L) {
+                lock.wait(curWaitMillis);
+            } else {    // only nanoseconds remaining
+                lock.wait(0L, (int) remWaitNanos);
             }
-            return sizeHolder.get();
+            long nanoWaitEnd = System.nanoTime();
+            long nanoWaitDuration = nanoWaitEnd - nanoWaitStart;
+            remWaitNanos -= nanoWaitDuration;
+        }
+
+        if (!isOpen()) {
+            throw new WindowClosedException(toString());
         }
+
+        throw new SocketTimeoutException("waitForCondition(" + this + ") 
timeout exceeded: " + maxWaitTime);
     }
 
     protected void updateSize(int size) {
@@ -298,15 +334,9 @@ public class Window extends AbstractLoggingBean implements 
java.nio.channels.Cha
         }
 
         // just in case someone is still waiting
-        int waiters;
         synchronized (lock) {
-            waiters = waitingCount.get();
             lock.notifyAll();
         }
-
-        if (log.isDebugEnabled()) {
-            log.debug("close({}) waiters={}", this, waiters);
-        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/4b19c2a5/sshd-core/src/main/java/org/apache/sshd/common/util/Predicate.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/Predicate.java 
b/sshd-core/src/main/java/org/apache/sshd/common/util/Predicate.java
new file mode 100644
index 0000000..03a4066
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/util/Predicate.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.common.util;
+
+/**
+ * @author <a href="mailto:[email protected]";>Apache MINA SSHD Project</a>
+ */
+public interface Predicate<T> {
+    // TODO replace this with Java-8 predicate when Java-8 becomes min. 
version for SSHD
+    boolean evaluate(T input);
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/4b19c2a5/sshd-core/src/test/java/org/apache/sshd/common/channel/WindowTimeoutTest.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/test/java/org/apache/sshd/common/channel/WindowTimeoutTest.java 
b/sshd-core/src/test/java/org/apache/sshd/common/channel/WindowTimeoutTest.java
new file mode 100644
index 0000000..03b87b0
--- /dev/null
+++ 
b/sshd-core/src/test/java/org/apache/sshd/common/channel/WindowTimeoutTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.common.channel;
+
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.sshd.client.future.OpenFuture;
+import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.util.test.BaseTestSupport;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+
+/**
+ * @author <a href="mailto:[email protected]";>Apache MINA SSHD Project</a>
+ * @see <A HREF="https://issues.apache.org/jira/browse/SSHD-565";>SSHD-565</A>
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class WindowTimeoutTest extends BaseTestSupport {
+    public static final long MAX_WAIT_TIME = TimeUnit.SECONDS.toMillis(2L);
+
+    private AbstractChannel channel;
+
+    public WindowTimeoutTest() {
+        super();
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        channel = new AbstractChannel(getCurrentTestName(), true) {
+            @Override
+            public OpenFuture open(int recipient, int rwSize, int packetSize, 
Buffer buffer) {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public void handleOpenSuccess(int recipient, int rwSize, int 
packetSize, Buffer buffer) throws IOException {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public void handleOpenFailure(Buffer buffer) throws IOException {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            protected void doWriteExtendedData(byte[] data, int off, int len) 
throws IOException {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            protected void doWriteData(byte[] data, int off, int len) throws 
IOException {
+                throw new UnsupportedOperationException();
+            }
+        };
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (channel != null) {
+            channel.close();
+        }
+    }
+
+    @Test
+    public void testWindowWaitForSpaceTimeout() throws Exception {
+        try(Window window = channel.getLocalWindow()) {
+            window.init(AbstractChannel.DEFAULT_WINDOW_SIZE, 
AbstractChannel.DEFAULT_PACKET_SIZE, null);
+            window.consume(window.getSize());
+            assertEquals("Window not empty", 0, window.getSize());
+
+            long waitStart = System.nanoTime();
+            try {
+                int len = window.waitForSpace(MAX_WAIT_TIME);
+                fail("Unexpected timed wait success - len=" + len);
+            } catch (SocketTimeoutException e) {
+                long waitEnd = System.nanoTime();
+                long waitDuration = TimeUnit.NANOSECONDS.toMillis(waitEnd - 
waitStart);
+                // we allow ~100 millis variance to compensate for O/S wait 
time granularity
+                assertTrue("Timeout too soon: " + waitDuration, waitDuration 
>= (MAX_WAIT_TIME - 100L));
+            }
+
+            window.close();
+            assertFalse("Window not closed", window.isOpen());
+            try {
+                int len = window.waitForSpace(MAX_WAIT_TIME);
+                fail("Unexpected closed wait success - len=" + len);
+            } catch (WindowClosedException e) {
+                // expected
+            }
+        }
+    }
+
+    @Test
+    public void testWindowWaitAndConsumeTimeout() throws Exception {
+        try(Window window = channel.getLocalWindow()) {
+            window.init(AbstractChannel.DEFAULT_WINDOW_SIZE, 
AbstractChannel.DEFAULT_PACKET_SIZE, null);
+
+            long waitStart = System.nanoTime();
+            try
+            {
+                window.waitAndConsume(2 * window.getSize(), MAX_WAIT_TIME);
+                fail("Unexpected timed wait success");
+            } catch (SocketTimeoutException e) {
+                long waitEnd = System.nanoTime();
+                long waitDuration = TimeUnit.NANOSECONDS.toMillis(waitEnd - 
waitStart);
+                // we allow ~100 millis variance to compensate for O/S wait 
time granularity
+                assertTrue("Timeout too soon: " + waitDuration, waitDuration 
>= (MAX_WAIT_TIME - 100L));
+            }
+
+            window.close();
+            assertFalse("Window not closed", window.isOpen());
+            try {
+                window.waitAndConsume(2 * window.getSize(), MAX_WAIT_TIME);
+                fail("Unexpected closed wait success");
+            } catch (WindowClosedException e) {
+                // expected
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/4b19c2a5/sshd-git/src/main/java/org/apache/sshd/git/transport/GitSshdSessionFactory.java
----------------------------------------------------------------------
diff --git 
a/sshd-git/src/main/java/org/apache/sshd/git/transport/GitSshdSessionFactory.java
 
b/sshd-git/src/main/java/org/apache/sshd/git/transport/GitSshdSessionFactory.java
index a3483d4..0e19278 100644
--- 
a/sshd-git/src/main/java/org/apache/sshd/git/transport/GitSshdSessionFactory.java
+++ 
b/sshd-git/src/main/java/org/apache/sshd/git/transport/GitSshdSessionFactory.java
@@ -151,7 +151,7 @@ public class GitSshdSessionFactory extends 
SshSessionFactory {
                     if (res.contains(ClientChannel.ClientChannelEvent.CLOSED)) 
{
                         return 0;
                     } else {
-                        return (-1);
+                        return -1;
                     }
                 }
                 @Override

Reply via email to