This is an automated email from the ASF dual-hosted git repository.
markt pushed a commit to branch 10.1.x
in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/10.1.x by this push:
new e633e73d61 Refactor to reduce pinning in HTTP/2 code when using
virtual threads
e633e73d61 is described below
commit e633e73d616196fa26da69458fca26b31c5d436a
Author: Mark Thomas <[email protected]>
AuthorDate: Thu Jul 27 15:37:59 2023 +0100
Refactor to reduce pinning in HTTP/2 code when using virtual threads
---
java/org/apache/coyote/http2/AbstractStream.java | 84 ++++---
.../apache/coyote/http2/Http2UpgradeHandler.java | 241 +++++++++++----------
java/org/apache/coyote/http2/RecycledStream.java | 1 -
java/org/apache/coyote/http2/Stream.java | 84 +++----
.../coyote/http2/WindowAllocationManager.java | 27 ++-
5 files changed, 250 insertions(+), 187 deletions(-)
diff --git a/java/org/apache/coyote/http2/AbstractStream.java
b/java/org/apache/coyote/http2/AbstractStream.java
index f332b8c593..d6fb8d8280 100644
--- a/java/org/apache/coyote/http2/AbstractStream.java
+++ b/java/org/apache/coyote/http2/AbstractStream.java
@@ -16,6 +16,10 @@
*/
package org.apache.coyote.http2;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.apache.tomcat.util.res.StringManager;
@@ -33,6 +37,8 @@ abstract class AbstractStream {
private final String idAsString;
private long windowSize =
ConnectionSettingsBase.DEFAULT_INITIAL_WINDOW_SIZE;
+ protected final Lock windowAllocationLock = new ReentrantLock();
+ protected final Condition windowAllocationAvailable =
windowAllocationLock.newCondition();
private volatile int connectionAllocationRequested = 0;
private volatile int connectionAllocationMade = 0;
@@ -59,13 +65,23 @@ abstract class AbstractStream {
}
- final synchronized void setWindowSize(long windowSize) {
- this.windowSize = windowSize;
+ final void setWindowSize(long windowSize) {
+ windowAllocationLock.lock();
+ try {
+ this.windowSize = windowSize;
+ } finally {
+ windowAllocationLock.unlock();
+ }
}
- final synchronized long getWindowSize() {
- return windowSize;
+ final long getWindowSize() {
+ windowAllocationLock.lock();
+ try {
+ return windowSize;
+ } finally {
+ windowAllocationLock.unlock();
+ }
}
@@ -76,37 +92,47 @@ abstract class AbstractStream {
*
* @throws Http2Exception If the window size is now higher than the
maximum allowed
*/
- synchronized void incrementWindowSize(int increment) throws Http2Exception
{
- // No need for overflow protection here.
- // Increment can't be more than Integer.MAX_VALUE and once windowSize
- // goes beyond 2^31-1 an error is triggered.
- windowSize += increment;
-
- if (log.isDebugEnabled()) {
- log.debug(sm.getString("abstractStream.windowSizeInc",
getConnectionId(), getIdAsString(),
- Integer.toString(increment), Long.toString(windowSize)));
- }
+ void incrementWindowSize(int increment) throws Http2Exception {
+ windowAllocationLock.lock();
+ try {
+ // No need for overflow protection here.
+ // Increment can't be more than Integer.MAX_VALUE and once
windowSize
+ // goes beyond 2^31-1 an error is triggered.
+ windowSize += increment;
+
+ if (log.isDebugEnabled()) {
+ log.debug(sm.getString("abstractStream.windowSizeInc",
getConnectionId(), getIdAsString(),
+ Integer.toString(increment),
Long.toString(windowSize)));
+ }
- if (windowSize > ConnectionSettingsBase.MAX_WINDOW_SIZE) {
- String msg = sm.getString("abstractStream.windowSizeTooBig",
getConnectionId(), identifier,
- Integer.toString(increment), Long.toString(windowSize));
- if (identifier.intValue() == 0) {
- throw new ConnectionException(msg,
Http2Error.FLOW_CONTROL_ERROR);
- } else {
- throw new StreamException(msg, Http2Error.FLOW_CONTROL_ERROR,
identifier.intValue());
+ if (windowSize > ConnectionSettingsBase.MAX_WINDOW_SIZE) {
+ String msg = sm.getString("abstractStream.windowSizeTooBig",
getConnectionId(), identifier,
+ Integer.toString(increment),
Long.toString(windowSize));
+ if (identifier.intValue() == 0) {
+ throw new ConnectionException(msg,
Http2Error.FLOW_CONTROL_ERROR);
+ } else {
+ throw new StreamException(msg,
Http2Error.FLOW_CONTROL_ERROR, identifier.intValue());
+ }
}
+ } finally {
+ windowAllocationLock.unlock();
}
}
- final synchronized void decrementWindowSize(int decrement) {
- // No need for overflow protection here. Decrement can never be larger
- // the Integer.MAX_VALUE and once windowSize goes negative no further
- // decrements are permitted
- windowSize -= decrement;
- if (log.isDebugEnabled()) {
- log.debug(sm.getString("abstractStream.windowSizeDec",
getConnectionId(), getIdAsString(),
- Integer.toString(decrement), Long.toString(windowSize)));
+ final void decrementWindowSize(int decrement) {
+ windowAllocationLock.lock();
+ try {
+ // No need for overflow protection here. Decrement can never be
larger
+ // the Integer.MAX_VALUE and once windowSize goes negative no
further
+ // decrements are permitted
+ windowSize -= decrement;
+ if (log.isDebugEnabled()) {
+ log.debug(sm.getString("abstractStream.windowSizeDec",
getConnectionId(), getIdAsString(),
+ Integer.toString(decrement),
Long.toString(windowSize)));
+ }
+ } finally {
+ windowAllocationLock.unlock();
}
}
diff --git a/java/org/apache/coyote/http2/Http2UpgradeHandler.java
b/java/org/apache/coyote/http2/Http2UpgradeHandler.java
index 811737bf83..bf8be7cf83 100644
--- a/java/org/apache/coyote/http2/Http2UpgradeHandler.java
+++ b/java/org/apache/coyote/http2/Http2UpgradeHandler.java
@@ -929,8 +929,10 @@ class Http2UpgradeHandler extends AbstractStream
implements InternalHttpUpgradeH
// Need to be holding the stream lock so releaseBacklog() can't notify
// this thread until after this thread enters wait()
int allocation = 0;
- synchronized (stream) {
- synchronized (this) {
+ stream.windowAllocationLock.lock();
+ try {
+ windowAllocationLock.lock();
+ try {
if (!stream.canWrite()) {
stream.doStreamCancel(
sm.getString("upgradeHandler.stream.notWritable",
stream.getConnectionId(),
@@ -955,6 +957,8 @@ class Http2UpgradeHandler extends AbstractStream implements
InternalHttpUpgradeH
allocation = reservation;
decrementWindowSize(allocation);
}
+ } finally {
+ windowAllocationLock.unlock();
}
if (allocation == 0) {
if (block) {
@@ -1001,18 +1005,19 @@ class Http2UpgradeHandler extends AbstractStream
implements InternalHttpUpgradeH
return 0;
}
}
+ } finally {
+ stream.windowAllocationLock.unlock();
}
return allocation;
}
- @SuppressWarnings("sync-override") // notify() needs to be outside sync
- // to avoid deadlock
@Override
protected void incrementWindowSize(int increment) throws Http2Exception {
Set<AbstractStream> streamsToNotify = null;
- synchronized (this) {
+ windowAllocationLock.lock();
+ try {
long windowSize = getWindowSize();
if (windowSize < 1 && windowSize + increment > 0) {
// Connection window is exhausted. Assume there will be streams
@@ -1021,6 +1026,8 @@ class Http2UpgradeHandler extends AbstractStream
implements InternalHttpUpgradeH
} else {
super.incrementWindowSize(increment);
}
+ } finally {
+ windowAllocationLock.unlock();
}
if (streamsToNotify != null) {
@@ -1053,134 +1060,144 @@ class Http2UpgradeHandler extends AbstractStream
implements InternalHttpUpgradeH
}
- private synchronized Set<AbstractStream> releaseBackLog(int increment)
throws Http2Exception {
- Set<AbstractStream> result = new HashSet<>();
- if (backLogSize < increment) {
- // Can clear the whole backlog
- for (AbstractStream stream : backLogStreams) {
- if (stream.getConnectionAllocationRequested() > 0) {
-
stream.setConnectionAllocationMade(stream.getConnectionAllocationRequested());
- stream.setConnectionAllocationRequested(0);
- result.add(stream);
+ private Set<AbstractStream> releaseBackLog(int increment) throws
Http2Exception {
+ windowAllocationLock.lock();
+ try {
+ Set<AbstractStream> result = new HashSet<>();
+ if (backLogSize < increment) {
+ // Can clear the whole backlog
+ for (AbstractStream stream : backLogStreams) {
+ if (stream.getConnectionAllocationRequested() > 0) {
+
stream.setConnectionAllocationMade(stream.getConnectionAllocationRequested());
+ stream.setConnectionAllocationRequested(0);
+ result.add(stream);
+ }
}
- }
- // Cast is safe due to test above
- int remaining = increment - (int) backLogSize;
- backLogSize = 0;
- super.incrementWindowSize(remaining);
+ // Cast is safe due to test above
+ int remaining = increment - (int) backLogSize;
+ backLogSize = 0;
+ super.incrementWindowSize(remaining);
- backLogStreams.clear();
- } else {
- // Can't clear the whole backlog.
- // Need streams in priority order
- Set<Stream> orderedStreams = new
ConcurrentSkipListSet<>(Comparator.comparingInt(Stream::getUrgency)
-
.thenComparing(Stream::getIncremental).thenComparing(Stream::getIdAsInt));
- orderedStreams.addAll(backLogStreams);
-
- // Iteration 1. Need to work out how much we can clear.
- long urgencyWhereAllocationIsExhausted = 0;
- long requestedAllocationForIncrementalStreams = 0;
- int remaining = increment;
- Iterator<Stream> orderedStreamsIterator =
orderedStreams.iterator();
- while (orderedStreamsIterator.hasNext()) {
- Stream s = orderedStreamsIterator.next();
- if (urgencyWhereAllocationIsExhausted < s.getUrgency()) {
- if (remaining < 1) {
- break;
+ backLogStreams.clear();
+ } else {
+ // Can't clear the whole backlog.
+ // Need streams in priority order
+ Set<Stream> orderedStreams = new
ConcurrentSkipListSet<>(Comparator.comparingInt(Stream::getUrgency)
+
.thenComparing(Stream::getIncremental).thenComparing(Stream::getIdAsInt));
+ orderedStreams.addAll(backLogStreams);
+
+ // Iteration 1. Need to work out how much we can clear.
+ long urgencyWhereAllocationIsExhausted = 0;
+ long requestedAllocationForIncrementalStreams = 0;
+ int remaining = increment;
+ Iterator<Stream> orderedStreamsIterator =
orderedStreams.iterator();
+ while (orderedStreamsIterator.hasNext()) {
+ Stream s = orderedStreamsIterator.next();
+ if (urgencyWhereAllocationIsExhausted < s.getUrgency()) {
+ if (remaining < 1) {
+ break;
+ }
+ requestedAllocationForIncrementalStreams = 0;
}
- requestedAllocationForIncrementalStreams = 0;
- }
- urgencyWhereAllocationIsExhausted = s.getUrgency();
- if (s.getIncremental()) {
- requestedAllocationForIncrementalStreams +=
s.getConnectionAllocationRequested();
- remaining -= s.getConnectionAllocationRequested();
- } else {
- remaining -= s.getConnectionAllocationRequested();
- if (remaining < 1) {
- break;
+ urgencyWhereAllocationIsExhausted = s.getUrgency();
+ if (s.getIncremental()) {
+ requestedAllocationForIncrementalStreams +=
s.getConnectionAllocationRequested();
+ remaining -= s.getConnectionAllocationRequested();
+ } else {
+ remaining -= s.getConnectionAllocationRequested();
+ if (remaining < 1) {
+ break;
+ }
}
}
- }
- // Iteration 2. Allocate.
- // Reset for second iteration
- remaining = increment;
- orderedStreamsIterator = orderedStreams.iterator();
- while (orderedStreamsIterator.hasNext()) {
- Stream s = orderedStreamsIterator.next();
- if (s.getUrgency() < urgencyWhereAllocationIsExhausted) {
- // Can fully allocate
- remaining = allocate(s, remaining);
- result.add(s);
- orderedStreamsIterator.remove();
- backLogStreams.remove(s);
- } else if (requestedAllocationForIncrementalStreams == 0) {
- // Allocation ran out in non-incremental streams so fully
- // allocate in iterator order until allocation is exhausted
- remaining = allocate(s, remaining);
- result.add(s);
- if (s.getConnectionAllocationRequested() == 0) {
- // Fully allocated
+ // Iteration 2. Allocate.
+ // Reset for second iteration
+ remaining = increment;
+ orderedStreamsIterator = orderedStreams.iterator();
+ while (orderedStreamsIterator.hasNext()) {
+ Stream s = orderedStreamsIterator.next();
+ if (s.getUrgency() < urgencyWhereAllocationIsExhausted) {
+ // Can fully allocate
+ remaining = allocate(s, remaining);
+ result.add(s);
orderedStreamsIterator.remove();
backLogStreams.remove(s);
- }
- if (remaining < 1) {
- break;
- }
- } else {
- // Allocation ran out in incremental streams. Distribute
- // remaining allocation between the incremental streams at
- // this urgency level.
- if (s.getUrgency() != urgencyWhereAllocationIsExhausted) {
- break;
- }
+ } else if (requestedAllocationForIncrementalStreams == 0) {
+ // Allocation ran out in non-incremental streams so
fully
+ // allocate in iterator order until allocation is
exhausted
+ remaining = allocate(s, remaining);
+ result.add(s);
+ if (s.getConnectionAllocationRequested() == 0) {
+ // Fully allocated
+ orderedStreamsIterator.remove();
+ backLogStreams.remove(s);
+ }
+ if (remaining < 1) {
+ break;
+ }
+ } else {
+ // Allocation ran out in incremental streams.
Distribute
+ // remaining allocation between the incremental
streams at
+ // this urgency level.
+ if (s.getUrgency() !=
urgencyWhereAllocationIsExhausted) {
+ break;
+ }
- int share = (int) (s.getConnectionAllocationRequested() *
remaining /
- requestedAllocationForIncrementalStreams);
- if (share == 0) {
- share = 1;
- }
- allocate(s, share);
- result.add(s);
- if (s.getConnectionAllocationRequested() == 0) {
- // Fully allocated (unlikely but possible due to
- // rounding if only a few bytes required).
- orderedStreamsIterator.remove();
- backLogStreams.remove(s);
+ int share = (int)
(s.getConnectionAllocationRequested() * remaining /
+ requestedAllocationForIncrementalStreams);
+ if (share == 0) {
+ share = 1;
+ }
+ allocate(s, share);
+ result.add(s);
+ if (s.getConnectionAllocationRequested() == 0) {
+ // Fully allocated (unlikely but possible due to
+ // rounding if only a few bytes required).
+ orderedStreamsIterator.remove();
+ backLogStreams.remove(s);
+ }
}
}
}
+ return result;
+ } finally {
+ windowAllocationLock.unlock();
}
- return result;
}
- private synchronized int allocate(AbstractStream stream, int allocation) {
- if (log.isDebugEnabled()) {
- log.debug(sm.getString("upgradeHandler.allocate.debug",
getConnectionId(), stream.getIdAsString(),
- Integer.toString(allocation)));
- }
+ private int allocate(AbstractStream stream, int allocation) {
+ windowAllocationLock.lock();
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug(sm.getString("upgradeHandler.allocate.debug",
getConnectionId(), stream.getIdAsString(),
+ Integer.toString(allocation)));
+ }
- int leftToAllocate = allocation;
+ int leftToAllocate = allocation;
- if (stream.getConnectionAllocationRequested() > 0) {
- int allocatedThisTime;
- if (allocation >= stream.getConnectionAllocationRequested()) {
- allocatedThisTime = stream.getConnectionAllocationRequested();
- } else {
- allocatedThisTime = allocation;
+ if (stream.getConnectionAllocationRequested() > 0) {
+ int allocatedThisTime;
+ if (allocation >= stream.getConnectionAllocationRequested()) {
+ allocatedThisTime =
stream.getConnectionAllocationRequested();
+ } else {
+ allocatedThisTime = allocation;
+ }
+
stream.setConnectionAllocationRequested(stream.getConnectionAllocationRequested()
- allocatedThisTime);
+
stream.setConnectionAllocationMade(stream.getConnectionAllocationMade() +
allocatedThisTime);
+ leftToAllocate = leftToAllocate - allocatedThisTime;
}
-
stream.setConnectionAllocationRequested(stream.getConnectionAllocationRequested()
- allocatedThisTime);
-
stream.setConnectionAllocationMade(stream.getConnectionAllocationMade() +
allocatedThisTime);
- leftToAllocate = leftToAllocate - allocatedThisTime;
- }
- if (log.isDebugEnabled()) {
- log.debug(sm.getString("upgradeHandler.allocate.left",
getConnectionId(), stream.getIdAsString(),
- Integer.toString(leftToAllocate)));
- }
+ if (log.isDebugEnabled()) {
+ log.debug(sm.getString("upgradeHandler.allocate.left",
getConnectionId(), stream.getIdAsString(),
+ Integer.toString(leftToAllocate)));
+ }
- return leftToAllocate;
+ return leftToAllocate;
+ } finally {
+ windowAllocationLock.unlock();
+ }
}
diff --git a/java/org/apache/coyote/http2/RecycledStream.java
b/java/org/apache/coyote/http2/RecycledStream.java
index 143ae1d371..c4c180ac1f 100644
--- a/java/org/apache/coyote/http2/RecycledStream.java
+++ b/java/org/apache/coyote/http2/RecycledStream.java
@@ -40,7 +40,6 @@ class RecycledStream extends AbstractNonZeroStream {
}
- @SuppressWarnings("sync-override")
@Override
void incrementWindowSize(int increment) throws Http2Exception {
// NO-OP
diff --git a/java/org/apache/coyote/http2/Stream.java
b/java/org/apache/coyote/http2/Stream.java
index b0dd8ad844..6b32049b82 100644
--- a/java/org/apache/coyote/http2/Stream.java
+++ b/java/org/apache/coyote/http2/Stream.java
@@ -219,52 +219,62 @@ class Stream extends AbstractNonZeroStream implements
HeaderEmitter {
@Override
- final synchronized void incrementWindowSize(int windowSizeIncrement)
throws Http2Exception {
- // If this is zero then any thread that has been trying to write for
- // this stream will be waiting. Notify that thread it can continue. Use
- // notify all even though only one thread is waiting to be on the safe
- // side.
- boolean notify = getWindowSize() < 1;
- super.incrementWindowSize(windowSizeIncrement);
- if (notify && getWindowSize() > 0) {
- allocationManager.notifyStream();
+ final void incrementWindowSize(int windowSizeIncrement) throws
Http2Exception {
+ windowAllocationLock.lock();
+ try {
+ // If this is zero then any thread that has been trying to write
for
+ // this stream will be waiting. Notify that thread it can
continue. Use
+ // notify all even though only one thread is waiting to be on the
safe
+ // side.
+ boolean notify = getWindowSize() < 1;
+ super.incrementWindowSize(windowSizeIncrement);
+ if (notify && getWindowSize() > 0) {
+ allocationManager.notifyStream();
+ }
+ } finally {
+ windowAllocationLock.unlock();
}
}
- final synchronized int reserveWindowSize(int reservation, boolean block)
throws IOException {
- long windowSize = getWindowSize();
- while (windowSize < 1) {
- if (!canWrite()) {
- throw new CloseNowException(sm.getString("stream.notWritable",
getConnectionId(), getIdAsString()));
- }
- if (block) {
- try {
- long writeTimeout =
handler.getProtocol().getStreamWriteTimeout();
- allocationManager.waitForStream(writeTimeout);
- windowSize = getWindowSize();
- if (windowSize == 0) {
- doStreamCancel(sm.getString("stream.writeTimeout"),
Http2Error.ENHANCE_YOUR_CALM);
+ final int reserveWindowSize(int reservation, boolean block) throws
IOException {
+ windowAllocationLock.lock();
+ try {
+ long windowSize = getWindowSize();
+ while (windowSize < 1) {
+ if (!canWrite()) {
+ throw new
CloseNowException(sm.getString("stream.notWritable", getConnectionId(),
getIdAsString()));
+ }
+ if (block) {
+ try {
+ long writeTimeout =
handler.getProtocol().getStreamWriteTimeout();
+ allocationManager.waitForStream(writeTimeout);
+ windowSize = getWindowSize();
+ if (windowSize == 0) {
+
doStreamCancel(sm.getString("stream.writeTimeout"),
Http2Error.ENHANCE_YOUR_CALM);
+ }
+ } catch (InterruptedException e) {
+ // Possible shutdown / rst or similar. Use an
IOException to
+ // signal to the client that further I/O isn't
possible for this
+ // Stream.
+ throw new IOException(e);
}
- } catch (InterruptedException e) {
- // Possible shutdown / rst or similar. Use an IOException
to
- // signal to the client that further I/O isn't possible
for this
- // Stream.
- throw new IOException(e);
+ } else {
+ allocationManager.waitForStreamNonBlocking();
+ return 0;
}
+ }
+ int allocation;
+ if (windowSize < reservation) {
+ allocation = (int) windowSize;
} else {
- allocationManager.waitForStreamNonBlocking();
- return 0;
+ allocation = reservation;
}
+ decrementWindowSize(allocation);
+ return allocation;
+ } finally {
+ windowAllocationLock.unlock();
}
- int allocation;
- if (windowSize < reservation) {
- allocation = (int) windowSize;
- } else {
- allocation = reservation;
- }
- decrementWindowSize(allocation);
- return allocation;
}
diff --git a/java/org/apache/coyote/http2/WindowAllocationManager.java
b/java/org/apache/coyote/http2/WindowAllocationManager.java
index e784c4083c..811fe1821e 100644
--- a/java/org/apache/coyote/http2/WindowAllocationManager.java
+++ b/java/org/apache/coyote/http2/WindowAllocationManager.java
@@ -129,14 +129,18 @@ class WindowAllocationManager {
private boolean isWaitingFor(int waitTarget) {
- synchronized (stream) {
+ stream.windowAllocationLock.lock();
+ try {
return (waitingFor & waitTarget) > 0;
+ } finally {
+ stream.windowAllocationLock.unlock();
}
}
private void waitFor(int waitTarget, final long timeout) throws
InterruptedException {
- synchronized (stream) {
+ stream.windowAllocationLock.lock();
+ try {
if (waitingFor != NONE) {
throw new
IllegalStateException(sm.getString("windowAllocationManager.waitFor.ise",
stream.getConnectionId(), stream.getIdAsString()));
@@ -148,7 +152,7 @@ class WindowAllocationManager {
// Loop to handle spurious wake-ups
do {
if (timeout < 0) {
- stream.wait();
+ stream.windowAllocationAvailable.await();
} else {
long timeoutRemaining;
if (startNanos == -1) {
@@ -164,15 +168,18 @@ class WindowAllocationManager {
return;
}
}
- stream.wait(timeoutRemaining);
+ stream.windowAllocationAvailable.await(timeoutRemaining,
TimeUnit.MILLISECONDS);
}
} while (waitingFor != NONE);
+ } finally {
+ stream.windowAllocationLock.unlock();
}
}
private void waitForNonBlocking(int waitTarget) {
- synchronized (stream) {
+ stream.windowAllocationLock.lock();
+ try {
if (waitingFor == NONE) {
waitingFor = waitTarget;
} else if (waitingFor == waitTarget) {
@@ -182,14 +189,16 @@ class WindowAllocationManager {
throw new
IllegalStateException(sm.getString("windowAllocationManager.waitFor.ise",
stream.getConnectionId(), stream.getIdAsString()));
}
-
+ } finally {
+ stream.windowAllocationLock.unlock();
}
}
private void notify(int notifyTarget) {
- synchronized (stream) {
+ stream.windowAllocationLock.lock();
+ try {
if (log.isDebugEnabled()) {
log.debug(sm.getString("windowAllocationManager.notify",
stream.getConnectionId(),
stream.getIdAsString(), Integer.toString(waitingFor),
Integer.toString(notifyTarget)));
@@ -210,7 +219,7 @@ class WindowAllocationManager {
log.debug(sm.getString("windowAllocationManager.notified",
stream.getConnectionId(),
stream.getIdAsString()));
}
- stream.notify();
+ stream.windowAllocationAvailable.signal();
} else {
// Non-blocking so dispatch
if (log.isDebugEnabled()) {
@@ -225,6 +234,8 @@ class WindowAllocationManager {
}
}
}
+ } finally {
+ stream.windowAllocationLock.unlock();
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]