This is an automated email from the ASF dual-hosted git repository.
markt pushed a commit to branch 8.5.x
in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/8.5.x by this push:
new 259c615a76 Refactor to reduce pinning in HTTP/2 code when using
virtual threads
259c615a76 is described below
commit 259c615a764ce90a08e9a09c31f49379ce1cdb8b
Author: Mark Thomas <[email protected]>
AuthorDate: Wed Jul 26 12:36:15 2023 +0100
Refactor to reduce pinning in HTTP/2 code when using virtual threads
---
java/org/apache/coyote/http2/Stream.java | 295 +++++++++++++++++--------------
1 file changed, 164 insertions(+), 131 deletions(-)
diff --git a/java/org/apache/coyote/http2/Stream.java
b/java/org/apache/coyote/http2/Stream.java
index 40ec7319ad..3d2628668a 100644
--- a/java/org/apache/coyote/http2/Stream.java
+++ b/java/org/apache/coyote/http2/Stream.java
@@ -26,6 +26,8 @@ import java.security.PrivilegedExceptionAction;
import java.util.HashSet;
import java.util.Locale;
import java.util.Set;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.coyote.ActionCode;
import org.apache.coyote.CloseNowException;
@@ -868,6 +870,7 @@ class Stream extends AbstractNonZeroStream implements
HeaderEmitter {
class StreamOutputBuffer implements HttpOutputBuffer, WriteBuffer.Sink {
+ private final Lock writeLock = new ReentrantLock();
private final ByteBuffer buffer = ByteBuffer.allocate(8 * 1024);
private final WriteBuffer writeBuffer = new WriteBuffer(32 * 1024);
// Flag that indicates that data was left over on a previous
@@ -890,154 +893,179 @@ class Stream extends AbstractNonZeroStream implements
HeaderEmitter {
*/
@Deprecated
@Override
- public synchronized int doWrite(ByteChunk chunk) throws IOException {
- if (closed) {
- throw new IllegalStateException(sm.getString("stream.closed",
getConnectionId(), getIdentifier()));
- }
- if (!coyoteResponse.isCommitted()) {
- coyoteResponse.sendHeaders();
- }
- int len = chunk.getLength();
- int offset = 0;
- while (len > 0) {
- int thisTime = Math.min(buffer.remaining(), len);
- buffer.put(chunk.getBytes(), chunk.getOffset() + offset,
thisTime);
- offset += thisTime;
- len -= thisTime;
- if (len > 0 && !buffer.hasRemaining()) {
- // Only flush if we have more data to write and the buffer
- // is full
- if (flush(true, coyoteResponse.getWriteListener() ==
null)) {
- break;
- }
+ public int doWrite(ByteChunk chunk) throws IOException {
+ writeLock.lock();
+ try {
+ if (closed) {
+ throw new
IllegalStateException(sm.getString("stream.closed", getConnectionId(),
getIdentifier()));
}
- }
- written += offset;
- return offset;
- }
-
- @Override
- public final synchronized int doWrite(ByteBuffer chunk) throws
IOException {
- if (closed) {
- throw new IOException(sm.getString("stream.closed",
getConnectionId(), getIdAsString()));
- }
- int totalThisTime = 0;
- if (writeBuffer.isEmpty()) {
- int chunkLimit = chunk.limit();
- while (chunk.remaining() > 0) {
- int thisTime = Math.min(buffer.remaining(),
chunk.remaining());
- chunk.limit(chunk.position() + thisTime);
- buffer.put(chunk);
- chunk.limit(chunkLimit);
- totalThisTime += thisTime;
- if (chunk.remaining() > 0 && !buffer.hasRemaining()) {
+ if (!coyoteResponse.isCommitted()) {
+ coyoteResponse.sendHeaders();
+ }
+ int len = chunk.getLength();
+ int offset = 0;
+ while (len > 0) {
+ int thisTime = Math.min(buffer.remaining(), len);
+ buffer.put(chunk.getBytes(), chunk.getOffset() + offset,
thisTime);
+ offset += thisTime;
+ len -= thisTime;
+ if (len > 0 && !buffer.hasRemaining()) {
// Only flush if we have more data to write and the
buffer
// is full
if (flush(true, coyoteResponse.getWriteListener() ==
null)) {
- totalThisTime = chunk.remaining();
- writeBuffer.add(chunk);
- dataLeft = true;
break;
}
}
}
- } else {
- totalThisTime = chunk.remaining();
- writeBuffer.add(chunk);
+ written += offset;
+ return offset;
+ } finally {
+ writeLock.unlock();
}
- written += totalThisTime;
- return totalThisTime;
}
- final synchronized boolean flush(boolean block) throws IOException {
- /*
- * Need to ensure that there is exactly one call to flush even
when there is no data to write. Too few calls
- * (i.e. zero) and the end of stream message is not sent for a
completed asynchronous write. Too many calls
- * and the end of stream message is sent too soon and trailer
headers are not sent.
- */
- boolean dataInBuffer = buffer.position() > 0;
- boolean flushed = false;
-
- if (dataInBuffer) {
- dataInBuffer = flush(false, block);
- flushed = true;
- }
-
- if (dataInBuffer) {
- dataLeft = true;
- } else {
+ @Override
+ public final int doWrite(ByteBuffer chunk) throws IOException {
+ writeLock.lock();
+ try {
+ if (closed) {
+ throw new IOException(sm.getString("stream.closed",
getConnectionId(), getIdAsString()));
+ }
+ int totalThisTime = 0;
if (writeBuffer.isEmpty()) {
- // Both buffer and writeBuffer are empty.
- if (flushed) {
- dataLeft = false;
- } else {
- dataLeft = flush(false, block);
+ int chunkLimit = chunk.limit();
+ while (chunk.remaining() > 0) {
+ int thisTime = Math.min(buffer.remaining(),
chunk.remaining());
+ chunk.limit(chunk.position() + thisTime);
+ buffer.put(chunk);
+ chunk.limit(chunkLimit);
+ totalThisTime += thisTime;
+ if (chunk.remaining() > 0 && !buffer.hasRemaining()) {
+ // Only flush if we have more data to write and
the buffer
+ // is full
+ if (flush(true, coyoteResponse.getWriteListener()
== null)) {
+ totalThisTime = chunk.remaining();
+ writeBuffer.add(chunk);
+ dataLeft = true;
+ break;
+ }
+ }
}
} else {
- dataLeft = writeBuffer.write(this, block);
+ totalThisTime = chunk.remaining();
+ writeBuffer.add(chunk);
}
+ written += totalThisTime;
+ return totalThisTime;
+ } finally {
+ writeLock.unlock();
}
-
- return dataLeft;
}
- private synchronized boolean flush(boolean writeInProgress, boolean
block) throws IOException {
- if (log.isDebugEnabled()) {
- log.debug(sm.getString("stream.outputBuffer.flush.debug",
getConnectionId(), getIdAsString(),
- Integer.toString(buffer.position()),
Boolean.toString(writeInProgress),
- Boolean.toString(closed)));
- }
- if (buffer.position() == 0) {
- if (closed && !endOfStreamSent) {
- // Handling this special case here is simpler than trying
- // to modify the following code to handle it.
- handler.writeBody(Stream.this, buffer, 0, true);
+ final boolean flush(boolean block) throws IOException {
+ writeLock.lock();
+ try {
+ /*
+ * Need to ensure that there is exactly one call to flush even
when there is no data to write. Too few calls
+ * (i.e. zero) and the end of stream message is not sent for a
completed asynchronous write. Too many calls
+ * and the end of stream message is sent too soon and trailer
headers are not sent.
+ */
+ boolean dataInBuffer = buffer.position() > 0;
+ boolean flushed = false;
+
+ if (dataInBuffer) {
+ dataInBuffer = flush(false, block);
+ flushed = true;
}
- // Buffer is empty. Nothing to do.
- return false;
+
+ if (dataInBuffer) {
+ dataLeft = true;
+ } else {
+ if (writeBuffer.isEmpty()) {
+ // Both buffer and writeBuffer are empty.
+ if (flushed) {
+ dataLeft = false;
+ } else {
+ dataLeft = flush(false, block);
+ }
+ } else {
+ dataLeft = writeBuffer.write(this, block);
+ }
+ }
+
+ return dataLeft;
+ } finally {
+ writeLock.unlock();
}
- buffer.flip();
- int left = buffer.remaining();
- while (left > 0) {
- if (streamReservation == 0) {
- streamReservation = reserveWindowSize(left, block);
- if (streamReservation == 0) {
- // Must be non-blocking.
- // Note: Can't add to the writeBuffer here as the write
- // may originate from the writeBuffer.
- buffer.compact();
- return true;
+ }
+
+ private boolean flush(boolean writeInProgress, boolean block) throws
IOException {
+ writeLock.lock();
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug(sm.getString("stream.outputBuffer.flush.debug",
getConnectionId(), getIdAsString(),
+ Integer.toString(buffer.position()),
Boolean.toString(writeInProgress),
+ Boolean.toString(closed)));
+ }
+ if (buffer.position() == 0) {
+ if (closed && !endOfStreamSent) {
+ // Handling this special case here is simpler than
trying
+ // to modify the following code to handle it.
+ handler.writeBody(Stream.this, buffer, 0, true);
}
+ // Buffer is empty. Nothing to do.
+ return false;
}
- while (streamReservation > 0) {
- int connectionReservation =
handler.reserveWindowSize(Stream.this, streamReservation, block);
- if (connectionReservation == 0) {
- // Must be non-blocking.
- // Note: Can't add to the writeBuffer here as the write
- // may originate from the writeBuffer.
- buffer.compact();
- return true;
+ buffer.flip();
+ int left = buffer.remaining();
+ while (left > 0) {
+ if (streamReservation == 0) {
+ streamReservation = reserveWindowSize(left, block);
+ if (streamReservation == 0) {
+ // Must be non-blocking.
+ // Note: Can't add to the writeBuffer here as the
write
+ // may originate from the writeBuffer.
+ buffer.compact();
+ return true;
+ }
+ }
+ while (streamReservation > 0) {
+ int connectionReservation =
handler.reserveWindowSize(Stream.this, streamReservation, block);
+ if (connectionReservation == 0) {
+ // Must be non-blocking.
+ // Note: Can't add to the writeBuffer here as the
write
+ // may originate from the writeBuffer.
+ buffer.compact();
+ return true;
+ }
+ // Do the write
+ handler.writeBody(Stream.this, buffer,
connectionReservation,
+ !writeInProgress && closed && left ==
connectionReservation);
+ streamReservation -= connectionReservation;
+ left -= connectionReservation;
}
- // Do the write
- handler.writeBody(Stream.this, buffer,
connectionReservation,
- !writeInProgress && closed && left ==
connectionReservation);
- streamReservation -= connectionReservation;
- left -= connectionReservation;
}
+ buffer.clear();
+ return false;
+ } finally {
+ writeLock.unlock();
}
- buffer.clear();
- return false;
}
- final synchronized boolean isReady() {
- // Bug 63682
- // Only want to return false if the window size is zero AND we are
- // already waiting for an allocation.
- if (getWindowSize() > 0 && allocationManager.isWaitingForStream()
||
- handler.getWindowSize() > 0 &&
allocationManager.isWaitingForConnection() || dataLeft) {
- return false;
- } else {
- return true;
+ final boolean isReady() {
+ writeLock.lock();
+ try {
+ // Bug 63682
+ // Only want to return false if the window size is zero AND we
are
+ // already waiting for an allocation.
+ if (getWindowSize() > 0 &&
allocationManager.isWaitingForStream() ||
+ handler.getWindowSize() > 0 &&
allocationManager.isWaitingForConnection() || dataLeft) {
+ return false;
+ } else {
+ return true;
+ }
+ } finally {
+ writeLock.unlock();
}
}
@@ -1076,18 +1104,23 @@ class Stream extends AbstractNonZeroStream implements
HeaderEmitter {
}
@Override
- public synchronized boolean writeFromBuffer(ByteBuffer src, boolean
blocking) throws IOException {
- int chunkLimit = src.limit();
- while (src.remaining() > 0) {
- int thisTime = Math.min(buffer.remaining(), src.remaining());
- src.limit(src.position() + thisTime);
- buffer.put(src);
- src.limit(chunkLimit);
- if (flush(false, blocking)) {
- return true;
+ public boolean writeFromBuffer(ByteBuffer src, boolean blocking)
throws IOException {
+ writeLock.lock();
+ try {
+ int chunkLimit = src.limit();
+ while (src.remaining() > 0) {
+ int thisTime = Math.min(buffer.remaining(),
src.remaining());
+ src.limit(src.position() + thisTime);
+ buffer.put(src);
+ src.limit(chunkLimit);
+ if (flush(false, blocking)) {
+ return true;
+ }
}
+ return false;
+ } finally {
+ writeLock.unlock();
}
- return false;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]