This is an automated email from the ASF dual-hosted git repository. markt pushed a commit to branch 8.5.x in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/8.5.x by this push: new 259c615a76 Refactor to reduce pinning in HTTP/2 code when using virtual threads 259c615a76 is described below commit 259c615a764ce90a08e9a09c31f49379ce1cdb8b Author: Mark Thomas <ma...@apache.org> AuthorDate: Wed Jul 26 12:36:15 2023 +0100 Refactor to reduce pinning in HTTP/2 code when using virtual threads --- java/org/apache/coyote/http2/Stream.java | 295 +++++++++++++++++-------------- 1 file changed, 164 insertions(+), 131 deletions(-) diff --git a/java/org/apache/coyote/http2/Stream.java b/java/org/apache/coyote/http2/Stream.java index 40ec7319ad..3d2628668a 100644 --- a/java/org/apache/coyote/http2/Stream.java +++ b/java/org/apache/coyote/http2/Stream.java @@ -26,6 +26,8 @@ import java.security.PrivilegedExceptionAction; import java.util.HashSet; import java.util.Locale; import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.coyote.ActionCode; import org.apache.coyote.CloseNowException; @@ -868,6 +870,7 @@ class Stream extends AbstractNonZeroStream implements HeaderEmitter { class StreamOutputBuffer implements HttpOutputBuffer, WriteBuffer.Sink { + private final Lock writeLock = new ReentrantLock(); private final ByteBuffer buffer = ByteBuffer.allocate(8 * 1024); private final WriteBuffer writeBuffer = new WriteBuffer(32 * 1024); // Flag that indicates that data was left over on a previous @@ -890,154 +893,179 @@ class Stream extends AbstractNonZeroStream implements HeaderEmitter { */ @Deprecated @Override - public synchronized int doWrite(ByteChunk chunk) throws IOException { - if (closed) { - throw new IllegalStateException(sm.getString("stream.closed", getConnectionId(), getIdentifier())); - } - if (!coyoteResponse.isCommitted()) { - coyoteResponse.sendHeaders(); - } - int len = chunk.getLength(); - int offset = 0; - while (len > 0) { - int thisTime = Math.min(buffer.remaining(), len); - buffer.put(chunk.getBytes(), chunk.getOffset() + offset, thisTime); - offset += thisTime; - len -= thisTime; - if (len > 0 && !buffer.hasRemaining()) { - // Only flush if we have more data to write and the buffer - // is full - if (flush(true, coyoteResponse.getWriteListener() == null)) { - break; - } + public int doWrite(ByteChunk chunk) throws IOException { + writeLock.lock(); + try { + if (closed) { + throw new IllegalStateException(sm.getString("stream.closed", getConnectionId(), getIdentifier())); } - } - written += offset; - return offset; - } - - @Override - public final synchronized int doWrite(ByteBuffer chunk) throws IOException { - if (closed) { - throw new IOException(sm.getString("stream.closed", getConnectionId(), getIdAsString())); - } - int totalThisTime = 0; - if (writeBuffer.isEmpty()) { - int chunkLimit = chunk.limit(); - while (chunk.remaining() > 0) { - int thisTime = Math.min(buffer.remaining(), chunk.remaining()); - chunk.limit(chunk.position() + thisTime); - buffer.put(chunk); - chunk.limit(chunkLimit); - totalThisTime += thisTime; - if (chunk.remaining() > 0 && !buffer.hasRemaining()) { + if (!coyoteResponse.isCommitted()) { + coyoteResponse.sendHeaders(); + } + int len = chunk.getLength(); + int offset = 0; + while (len > 0) { + int thisTime = Math.min(buffer.remaining(), len); + buffer.put(chunk.getBytes(), chunk.getOffset() + offset, thisTime); + offset += thisTime; + len -= thisTime; + if (len > 0 && !buffer.hasRemaining()) { // Only flush if we have more data to write and the buffer // is full if (flush(true, coyoteResponse.getWriteListener() == null)) { - totalThisTime = chunk.remaining(); - writeBuffer.add(chunk); - dataLeft = true; break; } } } - } else { - totalThisTime = chunk.remaining(); - writeBuffer.add(chunk); + written += offset; + return offset; + } finally { + writeLock.unlock(); } - written += totalThisTime; - return totalThisTime; } - final synchronized boolean flush(boolean block) throws IOException { - /* - * Need to ensure that there is exactly one call to flush even when there is no data to write. Too few calls - * (i.e. zero) and the end of stream message is not sent for a completed asynchronous write. Too many calls - * and the end of stream message is sent too soon and trailer headers are not sent. - */ - boolean dataInBuffer = buffer.position() > 0; - boolean flushed = false; - - if (dataInBuffer) { - dataInBuffer = flush(false, block); - flushed = true; - } - - if (dataInBuffer) { - dataLeft = true; - } else { + @Override + public final int doWrite(ByteBuffer chunk) throws IOException { + writeLock.lock(); + try { + if (closed) { + throw new IOException(sm.getString("stream.closed", getConnectionId(), getIdAsString())); + } + int totalThisTime = 0; if (writeBuffer.isEmpty()) { - // Both buffer and writeBuffer are empty. - if (flushed) { - dataLeft = false; - } else { - dataLeft = flush(false, block); + int chunkLimit = chunk.limit(); + while (chunk.remaining() > 0) { + int thisTime = Math.min(buffer.remaining(), chunk.remaining()); + chunk.limit(chunk.position() + thisTime); + buffer.put(chunk); + chunk.limit(chunkLimit); + totalThisTime += thisTime; + if (chunk.remaining() > 0 && !buffer.hasRemaining()) { + // Only flush if we have more data to write and the buffer + // is full + if (flush(true, coyoteResponse.getWriteListener() == null)) { + totalThisTime = chunk.remaining(); + writeBuffer.add(chunk); + dataLeft = true; + break; + } + } } } else { - dataLeft = writeBuffer.write(this, block); + totalThisTime = chunk.remaining(); + writeBuffer.add(chunk); } + written += totalThisTime; + return totalThisTime; + } finally { + writeLock.unlock(); } - - return dataLeft; } - private synchronized boolean flush(boolean writeInProgress, boolean block) throws IOException { - if (log.isDebugEnabled()) { - log.debug(sm.getString("stream.outputBuffer.flush.debug", getConnectionId(), getIdAsString(), - Integer.toString(buffer.position()), Boolean.toString(writeInProgress), - Boolean.toString(closed))); - } - if (buffer.position() == 0) { - if (closed && !endOfStreamSent) { - // Handling this special case here is simpler than trying - // to modify the following code to handle it. - handler.writeBody(Stream.this, buffer, 0, true); + final boolean flush(boolean block) throws IOException { + writeLock.lock(); + try { + /* + * Need to ensure that there is exactly one call to flush even when there is no data to write. Too few calls + * (i.e. zero) and the end of stream message is not sent for a completed asynchronous write. Too many calls + * and the end of stream message is sent too soon and trailer headers are not sent. + */ + boolean dataInBuffer = buffer.position() > 0; + boolean flushed = false; + + if (dataInBuffer) { + dataInBuffer = flush(false, block); + flushed = true; } - // Buffer is empty. Nothing to do. - return false; + + if (dataInBuffer) { + dataLeft = true; + } else { + if (writeBuffer.isEmpty()) { + // Both buffer and writeBuffer are empty. + if (flushed) { + dataLeft = false; + } else { + dataLeft = flush(false, block); + } + } else { + dataLeft = writeBuffer.write(this, block); + } + } + + return dataLeft; + } finally { + writeLock.unlock(); } - buffer.flip(); - int left = buffer.remaining(); - while (left > 0) { - if (streamReservation == 0) { - streamReservation = reserveWindowSize(left, block); - if (streamReservation == 0) { - // Must be non-blocking. - // Note: Can't add to the writeBuffer here as the write - // may originate from the writeBuffer. - buffer.compact(); - return true; + } + + private boolean flush(boolean writeInProgress, boolean block) throws IOException { + writeLock.lock(); + try { + if (log.isDebugEnabled()) { + log.debug(sm.getString("stream.outputBuffer.flush.debug", getConnectionId(), getIdAsString(), + Integer.toString(buffer.position()), Boolean.toString(writeInProgress), + Boolean.toString(closed))); + } + if (buffer.position() == 0) { + if (closed && !endOfStreamSent) { + // Handling this special case here is simpler than trying + // to modify the following code to handle it. + handler.writeBody(Stream.this, buffer, 0, true); } + // Buffer is empty. Nothing to do. + return false; } - while (streamReservation > 0) { - int connectionReservation = handler.reserveWindowSize(Stream.this, streamReservation, block); - if (connectionReservation == 0) { - // Must be non-blocking. - // Note: Can't add to the writeBuffer here as the write - // may originate from the writeBuffer. - buffer.compact(); - return true; + buffer.flip(); + int left = buffer.remaining(); + while (left > 0) { + if (streamReservation == 0) { + streamReservation = reserveWindowSize(left, block); + if (streamReservation == 0) { + // Must be non-blocking. + // Note: Can't add to the writeBuffer here as the write + // may originate from the writeBuffer. + buffer.compact(); + return true; + } + } + while (streamReservation > 0) { + int connectionReservation = handler.reserveWindowSize(Stream.this, streamReservation, block); + if (connectionReservation == 0) { + // Must be non-blocking. + // Note: Can't add to the writeBuffer here as the write + // may originate from the writeBuffer. + buffer.compact(); + return true; + } + // Do the write + handler.writeBody(Stream.this, buffer, connectionReservation, + !writeInProgress && closed && left == connectionReservation); + streamReservation -= connectionReservation; + left -= connectionReservation; } - // Do the write - handler.writeBody(Stream.this, buffer, connectionReservation, - !writeInProgress && closed && left == connectionReservation); - streamReservation -= connectionReservation; - left -= connectionReservation; } + buffer.clear(); + return false; + } finally { + writeLock.unlock(); } - buffer.clear(); - return false; } - final synchronized boolean isReady() { - // Bug 63682 - // Only want to return false if the window size is zero AND we are - // already waiting for an allocation. - if (getWindowSize() > 0 && allocationManager.isWaitingForStream() || - handler.getWindowSize() > 0 && allocationManager.isWaitingForConnection() || dataLeft) { - return false; - } else { - return true; + final boolean isReady() { + writeLock.lock(); + try { + // Bug 63682 + // Only want to return false if the window size is zero AND we are + // already waiting for an allocation. + if (getWindowSize() > 0 && allocationManager.isWaitingForStream() || + handler.getWindowSize() > 0 && allocationManager.isWaitingForConnection() || dataLeft) { + return false; + } else { + return true; + } + } finally { + writeLock.unlock(); } } @@ -1076,18 +1104,23 @@ class Stream extends AbstractNonZeroStream implements HeaderEmitter { } @Override - public synchronized boolean writeFromBuffer(ByteBuffer src, boolean blocking) throws IOException { - int chunkLimit = src.limit(); - while (src.remaining() > 0) { - int thisTime = Math.min(buffer.remaining(), src.remaining()); - src.limit(src.position() + thisTime); - buffer.put(src); - src.limit(chunkLimit); - if (flush(false, blocking)) { - return true; + public boolean writeFromBuffer(ByteBuffer src, boolean blocking) throws IOException { + writeLock.lock(); + try { + int chunkLimit = src.limit(); + while (src.remaining() > 0) { + int thisTime = Math.min(buffer.remaining(), src.remaining()); + src.limit(src.position() + thisTime); + buffer.put(src); + src.limit(chunkLimit); + if (flush(false, blocking)) { + return true; + } } + return false; + } finally { + writeLock.unlock(); } - return false; } } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org