Repository: mina-sshd Updated Branches: refs/heads/master 3243c91df -> 4b19c2a53
[SSHD-565] Require a timeout value on Window's waitForSpace and waitAndConsume methods Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/4b19c2a5 Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/4b19c2a5 Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/4b19c2a5 Branch: refs/heads/master Commit: 4b19c2a5310aa5c7120395ae015922ce1cbf256e Parents: 3243c91 Author: Lyor Goldstein <[email protected]> Authored: Mon Sep 21 08:17:26 2015 +0300 Committer: Lyor Goldstein <[email protected]> Committed: Mon Sep 21 08:17:26 2015 +0300 ---------------------------------------------------------------------- .../apache/sshd/common/FactoryManagerUtils.java | 75 +++++++++- .../common/channel/ChannelOutputStream.java | 29 +++- .../org/apache/sshd/common/channel/Window.java | 138 +++++++++++------- .../org/apache/sshd/common/util/Predicate.java | 28 ++++ .../sshd/common/channel/WindowTimeoutTest.java | 142 +++++++++++++++++++ .../git/transport/GitSshdSessionFactory.java | 2 +- 6 files changed, 346 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/4b19c2a5/sshd-core/src/main/java/org/apache/sshd/common/FactoryManagerUtils.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManagerUtils.java b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManagerUtils.java index 680367a..d62c4c1 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManagerUtils.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManagerUtils.java @@ -22,6 +22,7 @@ package org.apache.sshd.common; import java.util.Map; import java.util.Objects; +import org.apache.sshd.common.channel.Channel; import org.apache.sshd.common.session.Session; import org.apache.sshd.common.util.GenericUtils; @@ -35,6 +36,18 @@ public final class FactoryManagerUtils { } /** + * @param channel The {@link Channel} instance + * @param name The property name + * @param defaultValue The default value to return if the specified property + * does not exist in the properties map + * @return The resolved property + * @throws NumberFormatException if malformed value + */ + public static long getLongProperty(Channel channel, String name, long defaultValue) { + return getLongProperty(channel.getSession(), name, defaultValue); + } + + /** * @param session The {@link Session} instance * @param name The property name * @param defaultValue The default value to return if the specified property @@ -78,6 +91,16 @@ public final class FactoryManagerUtils { } /** + * @param channel The {@link Channel} instance + * @param name The property name + * @return The {@link Long} value or {@code null} if property not found or empty string + * @throws NumberFormatException if malformed value + */ + public static Long getLong(Channel channel, String name) { + return getLong(channel.getSession(), name); + } + + /** * @param session The {@link Session} instance * @param name The property name * @return The {@link Long} value or {@code null} if property not found or empty string @@ -115,8 +138,12 @@ public final class FactoryManagerUtils { } } + public static Object updateProperty(Channel channel, String name, long value) { + return updateProperty(channel.getSession(), name, value); + } + public static Object updateProperty(Session session, String name, long value) { - return updateProperty(session, name, Long.toString(value)); + return updateProperty(session.getFactoryManager(), name, value); } public static Object updateProperty(FactoryManager manager, String name, long value) { @@ -127,6 +154,10 @@ public final class FactoryManagerUtils { return updateProperty(props, name, Long.valueOf(value)); } + public static int getIntProperty(Channel channel, String name, int defaultValue) { + return getIntProperty(channel.getSession(), name, defaultValue); + } + public static int getIntProperty(Session session, String name, int defaultValue) { return getIntProperty(session.getFactoryManager(), name, defaultValue); } @@ -146,6 +177,10 @@ public final class FactoryManagerUtils { } } + public static Integer getInteger(Channel channel, String name) { + return getInteger(channel.getSession(), name); + } + public static Integer getInteger(Session session, String name) { return getInteger(session.getFactoryManager(), name); } @@ -165,6 +200,10 @@ public final class FactoryManagerUtils { } } + public static Object updateProperty(Channel channel, String name, int value) { + return updateProperty(channel.getSession(), name, value); + } + public static Object updateProperty(Session session, String name, int value) { return updateProperty(session.getFactoryManager(), name, value); } @@ -177,6 +216,10 @@ public final class FactoryManagerUtils { return updateProperty(props, name, Integer.valueOf(value)); } + public static boolean getBooleanProperty(Channel channel, String name, boolean defaultValue) { + return getBooleanProperty(channel.getSession(), name, defaultValue); + } + public static boolean getBooleanProperty(Session session, String name, boolean defaultValue) { return getBooleanProperty(session.getFactoryManager(), name, defaultValue); } @@ -194,6 +237,10 @@ public final class FactoryManagerUtils { } } + public static Boolean getBoolean(Channel channel, String name) { + return getBoolean(channel.getSession(), name); + } + public static Boolean getBoolean(Session session, String name) { return getBoolean(session.getFactoryManager(), name); } @@ -213,6 +260,10 @@ public final class FactoryManagerUtils { } } + public static Object updateProperty(Channel channel, String name, boolean value) { + return updateProperty(channel.getSession(), name, value); + } + public static Object updateProperty(Session session, String name, boolean value) { return updateProperty(session.getFactoryManager(), name, value); } @@ -225,16 +276,24 @@ public final class FactoryManagerUtils { return updateProperty(props, name, Boolean.valueOf(value)); } - public static String getString(Session session, String name) { - return getStringProperty(session, name, null); + public static String getString(Channel channel, String name) { + return getString(channel.getSession(), name); } - public static String getStringProperty(Session session, String name, String defaultValue) { - return getStringProperty(session.getFactoryManager(), name, defaultValue); + public static String getString(Session session, String name) { + return getString(session.getFactoryManager(), name); } public static String getString(FactoryManager manager, String name) { - return getStringProperty(manager, name, null); + return getString(manager.getProperties(), name); + } + + public static String getStringProperty(Channel channel, String name, String defaultValue) { + return getStringProperty(channel.getSession(), name, defaultValue); + } + + public static String getStringProperty(Session session, String name, String defaultValue) { + return getStringProperty(session.getFactoryManager(), name, defaultValue); } public static String getStringProperty(FactoryManager manager, String name, String defaultValue) { @@ -255,6 +314,10 @@ public final class FactoryManagerUtils { } } + public static Object updateProperty(Channel channel, String name, Object value) { + return updateProperty(channel.getSession(), name, value); + } + public static Object updateProperty(Session session, String name, Object value) { return updateProperty(session.getFactoryManager(), name, value); } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/4b19c2a5/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java index ce2d2ef..a2c4ace 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java @@ -22,9 +22,11 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.io.OutputStream; import java.nio.channels.Channel; - +import java.util.concurrent.TimeUnit; +import org.apache.sshd.common.FactoryManagerUtils; import org.apache.sshd.common.SshConstants; import org.apache.sshd.common.SshException; +import org.apache.sshd.common.util.ValidateUtils; import org.apache.sshd.common.util.buffer.Buffer; import org.slf4j.Logger; @@ -34,9 +36,15 @@ import org.slf4j.Logger; * @author <a href="mailto:[email protected]">Apache MINA SSHD Project</a> */ public class ChannelOutputStream extends OutputStream implements Channel { + /** + * Configure max. wait time (millis) to wait for space to become available + */ + public static final String WAIT_FOR_SPACE_TIMEOUT = "channel-output-wait-for-space-timeout"; + public static final long DEFAULT_WAIT_FOR_SPACE_TIMEOUT = TimeUnit.SECONDS.toMillis(30L); private final AbstractChannel channel; private final Window remoteWindow; + private final long maxWaitTimeout; private final Logger log; private final byte cmd; private final byte[] b = new byte[1]; @@ -47,9 +55,15 @@ public class ChannelOutputStream extends OutputStream implements Channel { private boolean noDelay; public ChannelOutputStream(AbstractChannel channel, Window remoteWindow, Logger log, byte cmd) { - this.channel = channel; - this.remoteWindow = remoteWindow; - this.log = log; + this(channel, remoteWindow, FactoryManagerUtils.getLongProperty(channel, WAIT_FOR_SPACE_TIMEOUT, DEFAULT_WAIT_FOR_SPACE_TIMEOUT), log, cmd); + } + + public ChannelOutputStream(AbstractChannel channel, Window remoteWindow, long maxWaitTimeout, Logger log, byte cmd) { + this.channel = ValidateUtils.checkNotNull(channel, "No channel"); + this.remoteWindow = ValidateUtils.checkNotNull(remoteWindow, "No remote window"); + ValidateUtils.checkTrue(maxWaitTimeout > 0L, "Non-positive max. wait time: %d", maxWaitTimeout); + this.maxWaitTimeout = maxWaitTimeout; + this.log = ValidateUtils.checkNotNull(log, "No logger"); this.cmd = cmd; newBuffer(0); } @@ -93,7 +107,7 @@ public class ChannelOutputStream extends OutputStream implements Channel { flush(); } else { try { - remoteWindow.waitForSpace(); + remoteWindow.waitForSpace(maxWaitTimeout); } catch (WindowClosedException e) { closed = true; throw e; @@ -123,7 +137,8 @@ public class ChannelOutputStream extends OutputStream implements Channel { while (bufferLength > 0) { Buffer buf = buffer; int total = bufferLength; - int length = Math.min(Math.min(remoteWindow.waitForSpace(), total), remoteWindow.getPacketSize()); + int available = remoteWindow.waitForSpace(maxWaitTimeout); + int length = Math.min(Math.min(available, total), remoteWindow.getPacketSize()); int pos = buf.wpos(); buf.wpos((cmd == SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA) ? 14 : 10); buf.putInt(length); @@ -137,7 +152,7 @@ public class ChannelOutputStream extends OutputStream implements Channel { bufferLength = leftover; } lastSize = length; - remoteWindow.waitAndConsume(length); + remoteWindow.waitAndConsume(length, maxWaitTimeout); if (log.isDebugEnabled()) { log.debug("Send {} on channel {}", (cmd == SshConstants.SSH_MSG_CHANNEL_DATA) ? "SSH_MSG_CHANNEL_DATA" : "SSH_MSG_CHANNEL_EXTENDED_DATA", http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/4b19c2a5/sshd-core/src/main/java/org/apache/sshd/common/channel/Window.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/Window.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/Window.java index bf3ab51..0094b9d 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/channel/Window.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/Window.java @@ -20,14 +20,17 @@ package org.apache.sshd.common.channel; import java.io.IOException; import java.io.StreamCorruptedException; +import java.net.SocketTimeoutException; import java.util.Collections; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.sshd.common.FactoryManager; import org.apache.sshd.common.FactoryManagerUtils; import org.apache.sshd.common.session.Session; +import org.apache.sshd.common.util.Predicate; import org.apache.sshd.common.util.ValidateUtils; import org.apache.sshd.common.util.logging.AbstractLoggingBean; @@ -41,7 +44,18 @@ import org.apache.sshd.common.util.logging.AbstractLoggingBean; * @author <a href="mailto:[email protected]">Apache MINA SSHD Project</a> */ public class Window extends AbstractLoggingBean implements java.nio.channels.Channel { - private final AtomicInteger waitingCount = new AtomicInteger(0); + /** + * Default {@link Predicate} used to test if space became available + */ + public static final Predicate<Window> SPACE_AVAILABLE_PREDICATE = new Predicate<Window>() { + @SuppressWarnings("synthetic-access") + @Override + public boolean evaluate(Window input) { + // NOTE: we do not call "getSize()" on purpose in order to avoid the lock + return input.sizeHolder.get() > 0; + } + }; + private final AtomicBoolean closed = new AtomicBoolean(false); private final AtomicBoolean initialized = new AtomicBoolean(false); private final AtomicInteger sizeHolder = new AtomicInteger(0); @@ -196,34 +210,30 @@ public class Window extends AbstractLoggingBean implements java.nio.channels.Cha } } - public void waitAndConsume(int len) throws InterruptedException, WindowClosedException { + /** + * Waits for enough data to become available to consume the specified size + * + * @param len Size of data to consume + * @param maxWaitTime ax. time (millis) to wait for enough data to become available + * @throws InterruptedException If interrupted while waiting + * @throws WindowClosedException If window closed while waiting + * @throws SocketTimeoutException If timeout expired before enough data became available + * @see #waitForCondition(Predicate, long) + * @see #consume(int) + */ + public void waitAndConsume(final int len, long maxWaitTime) throws InterruptedException, WindowClosedException, SocketTimeoutException { ValidateUtils.checkTrue(len >= 0, "Negative wait consume length: %d", len); checkInitialized("waitAndConsume"); synchronized (lock) { - while (isOpen() && (sizeHolder.get() < len)) { - int waiters = waitingCount.incrementAndGet(); - if (log.isDebugEnabled()) { - log.debug("waitAndConsume({}) - requested={}, available={}, waiters={}", this, len, sizeHolder, waiters); + waitForCondition(new Predicate<Window>() { + @SuppressWarnings("synthetic-access") + @Override + public boolean evaluate(Window input) { + // NOTE: we do not call "getSize()" on purpose in order to avoid the lock + return input.sizeHolder.get() >= len; } - - long nanoStart = System.nanoTime(); - try { - lock.wait(); - } finally { - long nanoEnd = System.nanoTime(); - long nanoDuration = nanoEnd - nanoStart; - waiters = waitingCount.decrementAndGet(); - if (log.isTraceEnabled()) { - log.debug("waitAndConsume({}) - requested={}, available={}, waiters={} - ended after {} nanos", - this, len, sizeHolder, waiters, nanoDuration); - } - } - } - - if (!isOpen()) { - throw new WindowClosedException(toString()); - } + }, maxWaitTime); if (log.isDebugEnabled()) { log.debug("waitAndConsume({}) - requested={}, available={}", this, len, sizeHolder); @@ -234,44 +244,70 @@ public class Window extends AbstractLoggingBean implements java.nio.channels.Cha } /** - * Waits (forever) until some data becomes available + * Waits until some data becomes available or timeout expires * + * @param maxWaitTime Max. time (millis) to wait for space to become available * @return Amount of available data - always positive * @throws InterruptedException If interrupted while waiting * @throws WindowClosedException If window closed while waiting + * @throws SocketTimeoutException If timeout expired before space became available + * @see #waitForCondition(Predicate, long) */ - public int waitForSpace() throws InterruptedException, WindowClosedException { + public int waitForSpace(long maxWaitTime) throws InterruptedException, WindowClosedException, SocketTimeoutException { checkInitialized("waitForSpace"); synchronized (lock) { - while (isOpen() && (sizeHolder.get() <= 0)) { - int waiters = waitingCount.incrementAndGet(); - if (log.isDebugEnabled()) { - log.debug("waitForSpace({}) - waiters={}", this, waiters); - } - - long nanoStart = System.nanoTime(); - try { - lock.wait(); - } finally { - long nanoEnd = System.nanoTime(); - long nanoDuration = nanoEnd - nanoStart; - waiters = waitingCount.decrementAndGet(); - if (log.isTraceEnabled()) { - log.debug("waitForSpace({}) - waiters={} - ended after {} nanos", this, waiters, nanoDuration); - } - } + waitForCondition(SPACE_AVAILABLE_PREDICATE, maxWaitTime); + if (log.isDebugEnabled()) { + log.debug("waitForSpace({}) available: {}", this, sizeHolder); } + return sizeHolder.get(); + } + } - if (!isOpen()) { - throw new WindowClosedException(toString()); + /** + * Waits up to a specified amount of time for a condition to be satisfied and + * signaled via the lock. <B>Note:</B> assumes that lock is acquired when this + * method is called. + * + * @param predicate The {@link Predicate} to check if the condition has been + * satisfied - the argument to the predicate is {@code this} reference + * @param maxWaitTime Max. time (millis) to wait for the condition to be satisfied + * @throws WindowClosedException If window closed while waiting + * @throws InterruptedException If interrupted while waiting + * @throws SocketTimeoutException If timeout expired before condition was satisfied + * @see #isOpen() + */ + protected void waitForCondition(Predicate<? super Window> predicate, long maxWaitTime) + throws WindowClosedException, InterruptedException, SocketTimeoutException { + ValidateUtils.checkNotNull(predicate, "No condition"); + ValidateUtils.checkTrue(maxWaitTime > 0, "Non-positive max. wait time: %d", maxWaitTime); + + long maxWaitNanos = TimeUnit.MILLISECONDS.toNanos(maxWaitTime); + long remWaitNanos = maxWaitNanos; + // The loop takes care of spurious wakeups + while (isOpen() && (remWaitNanos > 0L)) { + if (predicate.evaluate(this)) { + return; } - if (log.isDebugEnabled()) { - log.debug("waitForSpace({}) available: {}", this, sizeHolder); + long curWaitMillis = TimeUnit.NANOSECONDS.toMillis(remWaitNanos); + long nanoWaitStart = System.nanoTime(); + if (curWaitMillis > 0L) { + lock.wait(curWaitMillis); + } else { // only nanoseconds remaining + lock.wait(0L, (int) remWaitNanos); } - return sizeHolder.get(); + long nanoWaitEnd = System.nanoTime(); + long nanoWaitDuration = nanoWaitEnd - nanoWaitStart; + remWaitNanos -= nanoWaitDuration; + } + + if (!isOpen()) { + throw new WindowClosedException(toString()); } + + throw new SocketTimeoutException("waitForCondition(" + this + ") timeout exceeded: " + maxWaitTime); } protected void updateSize(int size) { @@ -298,15 +334,9 @@ public class Window extends AbstractLoggingBean implements java.nio.channels.Cha } // just in case someone is still waiting - int waiters; synchronized (lock) { - waiters = waitingCount.get(); lock.notifyAll(); } - - if (log.isDebugEnabled()) { - log.debug("close({}) waiters={}", this, waiters); - } } @Override http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/4b19c2a5/sshd-core/src/main/java/org/apache/sshd/common/util/Predicate.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/Predicate.java b/sshd-core/src/main/java/org/apache/sshd/common/util/Predicate.java new file mode 100644 index 0000000..03a4066 --- /dev/null +++ b/sshd-core/src/main/java/org/apache/sshd/common/util/Predicate.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sshd.common.util; + +/** + * @author <a href="mailto:[email protected]">Apache MINA SSHD Project</a> + */ +public interface Predicate<T> { + // TODO replace this with Java-8 predicate when Java-8 becomes min. version for SSHD + boolean evaluate(T input); +} http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/4b19c2a5/sshd-core/src/test/java/org/apache/sshd/common/channel/WindowTimeoutTest.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/test/java/org/apache/sshd/common/channel/WindowTimeoutTest.java b/sshd-core/src/test/java/org/apache/sshd/common/channel/WindowTimeoutTest.java new file mode 100644 index 0000000..03b87b0 --- /dev/null +++ b/sshd-core/src/test/java/org/apache/sshd/common/channel/WindowTimeoutTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sshd.common.channel; + +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.util.concurrent.TimeUnit; + +import org.apache.sshd.client.future.OpenFuture; +import org.apache.sshd.common.util.buffer.Buffer; +import org.apache.sshd.util.test.BaseTestSupport; +import org.junit.After; +import org.junit.Before; +import org.junit.FixMethodOrder; +import org.junit.Test; +import org.junit.runners.MethodSorters; + +/** + * @author <a href="mailto:[email protected]">Apache MINA SSHD Project</a> + * @see <A HREF="https://issues.apache.org/jira/browse/SSHD-565">SSHD-565</A> + */ +@FixMethodOrder(MethodSorters.NAME_ASCENDING) +public class WindowTimeoutTest extends BaseTestSupport { + public static final long MAX_WAIT_TIME = TimeUnit.SECONDS.toMillis(2L); + + private AbstractChannel channel; + + public WindowTimeoutTest() { + super(); + } + + @Before + public void setUp() throws Exception { + channel = new AbstractChannel(getCurrentTestName(), true) { + @Override + public OpenFuture open(int recipient, int rwSize, int packetSize, Buffer buffer) { + throw new UnsupportedOperationException(); + } + + @Override + public void handleOpenSuccess(int recipient, int rwSize, int packetSize, Buffer buffer) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void handleOpenFailure(Buffer buffer) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + protected void doWriteExtendedData(byte[] data, int off, int len) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + protected void doWriteData(byte[] data, int off, int len) throws IOException { + throw new UnsupportedOperationException(); + } + }; + } + + @After + public void tearDown() throws Exception { + if (channel != null) { + channel.close(); + } + } + + @Test + public void testWindowWaitForSpaceTimeout() throws Exception { + try(Window window = channel.getLocalWindow()) { + window.init(AbstractChannel.DEFAULT_WINDOW_SIZE, AbstractChannel.DEFAULT_PACKET_SIZE, null); + window.consume(window.getSize()); + assertEquals("Window not empty", 0, window.getSize()); + + long waitStart = System.nanoTime(); + try { + int len = window.waitForSpace(MAX_WAIT_TIME); + fail("Unexpected timed wait success - len=" + len); + } catch (SocketTimeoutException e) { + long waitEnd = System.nanoTime(); + long waitDuration = TimeUnit.NANOSECONDS.toMillis(waitEnd - waitStart); + // we allow ~100 millis variance to compensate for O/S wait time granularity + assertTrue("Timeout too soon: " + waitDuration, waitDuration >= (MAX_WAIT_TIME - 100L)); + } + + window.close(); + assertFalse("Window not closed", window.isOpen()); + try { + int len = window.waitForSpace(MAX_WAIT_TIME); + fail("Unexpected closed wait success - len=" + len); + } catch (WindowClosedException e) { + // expected + } + } + } + + @Test + public void testWindowWaitAndConsumeTimeout() throws Exception { + try(Window window = channel.getLocalWindow()) { + window.init(AbstractChannel.DEFAULT_WINDOW_SIZE, AbstractChannel.DEFAULT_PACKET_SIZE, null); + + long waitStart = System.nanoTime(); + try + { + window.waitAndConsume(2 * window.getSize(), MAX_WAIT_TIME); + fail("Unexpected timed wait success"); + } catch (SocketTimeoutException e) { + long waitEnd = System.nanoTime(); + long waitDuration = TimeUnit.NANOSECONDS.toMillis(waitEnd - waitStart); + // we allow ~100 millis variance to compensate for O/S wait time granularity + assertTrue("Timeout too soon: " + waitDuration, waitDuration >= (MAX_WAIT_TIME - 100L)); + } + + window.close(); + assertFalse("Window not closed", window.isOpen()); + try { + window.waitAndConsume(2 * window.getSize(), MAX_WAIT_TIME); + fail("Unexpected closed wait success"); + } catch (WindowClosedException e) { + // expected + } + } + } +} http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/4b19c2a5/sshd-git/src/main/java/org/apache/sshd/git/transport/GitSshdSessionFactory.java ---------------------------------------------------------------------- diff --git a/sshd-git/src/main/java/org/apache/sshd/git/transport/GitSshdSessionFactory.java b/sshd-git/src/main/java/org/apache/sshd/git/transport/GitSshdSessionFactory.java index a3483d4..0e19278 100644 --- a/sshd-git/src/main/java/org/apache/sshd/git/transport/GitSshdSessionFactory.java +++ b/sshd-git/src/main/java/org/apache/sshd/git/transport/GitSshdSessionFactory.java @@ -151,7 +151,7 @@ public class GitSshdSessionFactory extends SshSessionFactory { if (res.contains(ClientChannel.ClientChannelEvent.CLOSED)) { return 0; } else { - return (-1); + return -1; } } @Override
