Author: markt
Date: Thu Jun 25 14:12:08 2015
New Revision: 1687527
URL: http://svn.apache.org/r1687527
Log:
Refactor the backlog/window size syncs after FindBugs highlighted a couple of
timing issues.
Modified:
tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java
tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java
tomcat/trunk/java/org/apache/coyote/http2/Stream.java
Modified: tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java?rev=1687527&r1=1687526&r2=1687527&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java Thu Jun 25
14:12:08 2015
@@ -36,7 +36,6 @@ abstract class AbstractStream {
private volatile AbstractStream parentStream = null;
private final Set<AbstractStream> childStreams = new HashSet<>();
private long windowSize = ConnectionSettings.DEFAULT_WINDOW_SIZE;
- private final Object windowSizeLock = new Object();
public Integer getIdentifier() {
return identifier;
@@ -95,17 +94,13 @@ abstract class AbstractStream {
}
- protected void setWindowSize(long windowSize) {
- synchronized (windowSizeLock) {
- this.windowSize = windowSize;
- }
+ protected synchronized void setWindowSize(long windowSize) {
+ this.windowSize = windowSize;
}
- protected long getWindowSize() {
- synchronized (windowSizeLock) {
- return windowSize;
- }
+ protected synchronized long getWindowSize() {
+ return windowSize;
}
@@ -113,32 +108,28 @@ abstract class AbstractStream {
* @param increment
* @throws Http2Exception
*/
- protected void incrementWindowSize(int increment) throws Http2Exception {
- synchronized (windowSizeLock) {
- // Overflow protection
- if (Long.MAX_VALUE - increment < windowSize) {
- windowSize = Long.MAX_VALUE;
- } else {
- windowSize += increment;
- }
- if (log.isDebugEnabled()) {
- log.debug(sm.getString("abstractStream.windowSizeInc",
getConnectionId(),
- getIdentifier(), Integer.toString(increment),
Long.toString(windowSize)));
- }
+ protected synchronized void incrementWindowSize(int increment) throws
Http2Exception {
+ // Overflow protection
+ if (Long.MAX_VALUE - increment < windowSize) {
+ windowSize = Long.MAX_VALUE;
+ } else {
+ windowSize += increment;
+ }
+ if (log.isDebugEnabled()) {
+ log.debug(sm.getString("abstractStream.windowSizeInc",
getConnectionId(),
+ getIdentifier(), Integer.toString(increment),
Long.toString(windowSize)));
}
}
- protected void decrementWindowSize(int decrement) {
+ protected synchronized void decrementWindowSize(int decrement) {
// No need for overflow protection here. Decrement can never be larger
// the Integer.MAX_VALUE and once windowSize goes negative no further
// decrements are permitted
- synchronized (windowSizeLock) {
- windowSize -= decrement;
- if (log.isDebugEnabled()) {
- log.debug(sm.getString("abstractStream.windowSizeDec",
getConnectionId(),
- getIdentifier(), Integer.toString(decrement),
Long.toString(windowSize)));
- }
+ windowSize -= decrement;
+ if (log.isDebugEnabled()) {
+ log.debug(sm.getString("abstractStream.windowSizeDec",
getConnectionId(),
+ getIdentifier(), Integer.toString(decrement),
Long.toString(windowSize)));
}
}
Modified: tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java?rev=1687527&r1=1687526&r2=1687527&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java
(original)
+++ tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java Thu Jun
25 14:12:08 2015
@@ -124,7 +124,6 @@ public class Http2UpgradeHandler extends
private volatile int maxProcessedStreamId;
// Tracking for when the connection is blocked (windowSize < 1)
- private final Object backLogLock = new Object();
private final Map<AbstractStream,int[]> backLogStreams = new
ConcurrentHashMap<>();
private long backLogSize = 0;
@@ -486,51 +485,64 @@ public class Http2UpgradeHandler extends
}
- int reserveWindowSize(Stream stream, int toWrite) {
- int result;
- synchronized (backLogLock) {
- long windowSize = getWindowSize();
- if (windowSize < 1 || backLogSize > 0) {
- // Has this stream been granted an allocation
- int[] value = backLogStreams.remove(stream);
- if (value != null && value[1] > 0) {
- result = value[1];
- } else {
- value = new int[] { toWrite, 0 };
- backLogStreams.put(stream, value);
- backLogSize += toWrite;
- // Add the parents as well
- AbstractStream parent = stream.getParentStream();
- while (parent != null &&
backLogStreams.putIfAbsent(parent, new int[2]) == null) {
- parent = parent.getParentStream();
+ int reserveWindowSize(Stream stream, int reservation) {
+ // Need to be holding the stream lock so releaseBacklog() can't notify
+ // this thread until after this thread enters wait()
+ int allocation = 0;
+ synchronized (stream) {
+ do {
+ synchronized (this) {
+ long windowSize = getWindowSize();
+ if (windowSize < 1 || backLogSize > 0) {
+ // Has this stream been granted an allocation
+ int[] value = backLogStreams.remove(stream);
+ if (value != null && value[1] > 0) {
+ allocation = value[1];
+ decrementWindowSize(allocation);
+ } else {
+ value = new int[] { reservation, 0 };
+ backLogStreams.put(stream, value);
+ backLogSize += reservation;
+ // Add the parents as well
+ AbstractStream parent = stream.getParentStream();
+ while (parent != null &&
backLogStreams.putIfAbsent(parent, new int[2]) == null) {
+ parent = parent.getParentStream();
+ }
+ }
+ } else if (windowSize < reservation) {
+ allocation = (int) windowSize;
+ decrementWindowSize(allocation);
+ } else {
+ allocation = reservation;
+ decrementWindowSize(allocation);
}
- result = 0;
}
- } else if (windowSize < toWrite) {
- result = (int) windowSize;
- } else {
- result = toWrite;
- }
- decrementWindowSize(result);
+ if (allocation == 0) {
+ try {
+ stream.wait();
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ } while (allocation == 0);
}
- return result;
+ return allocation;
}
@Override
- protected void incrementWindowSize(int increment) throws Http2Exception {
- synchronized (backLogLock) {
- long windowSize = getWindowSize();
- if (windowSize < 1 && windowSize + increment > 0) {
- releaseBackLog(increment);
- }
- super.incrementWindowSize(increment);
+ protected synchronized void incrementWindowSize(int increment) throws
Http2Exception {
+ long windowSize = getWindowSize();
+ if (windowSize < 1 && windowSize + increment > 0) {
+ releaseBackLog(increment);
}
+ super.incrementWindowSize(increment);
}
- private void releaseBackLog(int increment) {
+ private synchronized void releaseBackLog(int increment) {
if (backLogSize < increment) {
// Can clear the whole backlog
for (AbstractStream stream : backLogStreams.keySet()) {
Modified: tomcat/trunk/java/org/apache/coyote/http2/Stream.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Stream.java?rev=1687527&r1=1687526&r2=1687527&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/Stream.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http2/Stream.java Thu Jun 25 14:12:08
2015
@@ -119,28 +119,37 @@ public class Stream extends AbstractStre
@Override
- public void incrementWindowSize(int windowSizeIncrement) throws
Http2Exception {
+ public synchronized void incrementWindowSize(int windowSizeIncrement)
throws Http2Exception {
// If this is zero then any thread that has been trying to write for
// this stream will be waiting. Notify that thread it can continue. Use
// notify all even though only one thread is waiting to be on the safe
// side.
- boolean notify = getWindowSize() == 0;
+ boolean notify = getWindowSize() < 1;
super.incrementWindowSize(windowSizeIncrement);
- if (notify) {
- synchronized (this) {
- notifyAll();
- }
+ if (notify && getWindowSize() > 0) {
+ notifyAll();
}
}
- private int checkWindowSize(int reservation) {
+ private synchronized int reserveWindowSize(int reservation) {
long windowSize = getWindowSize();
- if (reservation > windowSize) {
- return (int) windowSize;
+ while (windowSize < 1) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ }
+ windowSize = getWindowSize();
+ }
+ int allocation;
+ if (windowSize < reservation) {
+ allocation = (int) windowSize;
} else {
- return reservation;
+ allocation = reservation;
}
+ decrementWindowSize(allocation);
+ return allocation;
}
@@ -277,8 +286,13 @@ public class Stream extends AbstractStre
private volatile long written = 0;
private volatile boolean closed = false;
+ /* The write methods are synchronized to ensure that only one thread at
+ * a time is able to access the buffer. Without this protection, a
+ * client that performed concurrent writes could corrupt the buffer.
+ */
+
@Override
- public int doWrite(ByteChunk chunk) throws IOException {
+ public synchronized int doWrite(ByteChunk chunk) throws IOException {
if (closed) {
// TODO i18n
throw new IllegalStateException();
@@ -300,11 +314,11 @@ public class Stream extends AbstractStre
return offset;
}
- public void flush() throws IOException {
+ public synchronized void flush() throws IOException {
flush(false);
}
- private void flush(boolean writeInProgress) throws IOException {
+ private synchronized void flush(boolean writeInProgress) throws
IOException {
if (!coyoteResponse.isCommitted()) {
coyoteResponse.sendHeaders();
}
@@ -314,51 +328,19 @@ public class Stream extends AbstractStre
}
buffer.flip();
int left = buffer.remaining();
- int thisWriteStream;
while (left > 0) {
- // Flow control for the Stream
- do {
- thisWriteStream = checkWindowSize(left);
- if (thisWriteStream < 1) {
- // Need to block until a WindowUpdate message is
- // processed for this stream
- synchronized (Stream.this) {
- try {
- Stream.this.wait();
- } catch (InterruptedException e) {
- // TODO: Possible shutdown?
- }
- }
- }
- } while (thisWriteStream < 1);
-
- // Flow control for the connection
- int thisWrite;
- do {
- thisWrite = handler.reserveWindowSize(Stream.this,
thisWriteStream);
- if (thisWrite < 1) {
- // Need to block until a WindowUpdate message is
- // processed for this connection
- synchronized (Stream.this) {
- try {
- Stream.this.wait();
- } catch (InterruptedException e) {
- // TODO: Possible shutdown?
- }
- }
- }
- } while (thisWrite < 1);
-
- // Stream.checkWindowSize() doesn't reduce the flow control
- // window (reserveWindowSize() does) so the Stream's window
- // needs to be reduced here.
- decrementWindowSize(thisWrite);
-
- // Do the write
- handler.writeBody(Stream.this, buffer, thisWrite,
- !writeInProgress && closed && left == thisWrite);
- left -= thisWrite;
- buffer.position(buffer.position() + thisWrite);
+ int streamReservation = reserveWindowSize(left);
+ while (streamReservation > 0) {
+ int connectionReservation =
+ handler.reserveWindowSize(Stream.this,
streamReservation);
+ // Do the write
+ handler.writeBody(Stream.this, buffer,
connectionReservation,
+ !writeInProgress && closed && left ==
connectionReservation);
+ streamReservation -= connectionReservation;
+ left -= connectionReservation;
+ buffer.position(buffer.position() + connectionReservation);
+
+ }
}
buffer.clear();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]