This is an automated email from the ASF dual-hosted git repository.

markt pushed a commit to branch 8.5.x
in repository https://gitbox.apache.org/repos/asf/tomcat.git


The following commit(s) were added to refs/heads/8.5.x by this push:
     new 259c615a76 Refactor to reduce pinning in HTTP/2 code when using 
virtual threads
259c615a76 is described below

commit 259c615a764ce90a08e9a09c31f49379ce1cdb8b
Author: Mark Thomas <ma...@apache.org>
AuthorDate: Wed Jul 26 12:36:15 2023 +0100

    Refactor to reduce pinning in HTTP/2 code when using virtual threads
---
 java/org/apache/coyote/http2/Stream.java | 295 +++++++++++++++++--------------
 1 file changed, 164 insertions(+), 131 deletions(-)

diff --git a/java/org/apache/coyote/http2/Stream.java 
b/java/org/apache/coyote/http2/Stream.java
index 40ec7319ad..3d2628668a 100644
--- a/java/org/apache/coyote/http2/Stream.java
+++ b/java/org/apache/coyote/http2/Stream.java
@@ -26,6 +26,8 @@ import java.security.PrivilegedExceptionAction;
 import java.util.HashSet;
 import java.util.Locale;
 import java.util.Set;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.coyote.ActionCode;
 import org.apache.coyote.CloseNowException;
@@ -868,6 +870,7 @@ class Stream extends AbstractNonZeroStream implements 
HeaderEmitter {
 
     class StreamOutputBuffer implements HttpOutputBuffer, WriteBuffer.Sink {
 
+        private final Lock writeLock = new ReentrantLock();
         private final ByteBuffer buffer = ByteBuffer.allocate(8 * 1024);
         private final WriteBuffer writeBuffer = new WriteBuffer(32 * 1024);
         // Flag that indicates that data was left over on a previous
@@ -890,154 +893,179 @@ class Stream extends AbstractNonZeroStream implements 
HeaderEmitter {
          */
         @Deprecated
         @Override
-        public synchronized int doWrite(ByteChunk chunk) throws IOException {
-            if (closed) {
-                throw new IllegalStateException(sm.getString("stream.closed", 
getConnectionId(), getIdentifier()));
-            }
-            if (!coyoteResponse.isCommitted()) {
-                coyoteResponse.sendHeaders();
-            }
-            int len = chunk.getLength();
-            int offset = 0;
-            while (len > 0) {
-                int thisTime = Math.min(buffer.remaining(), len);
-                buffer.put(chunk.getBytes(), chunk.getOffset() + offset, 
thisTime);
-                offset += thisTime;
-                len -= thisTime;
-                if (len > 0 && !buffer.hasRemaining()) {
-                    // Only flush if we have more data to write and the buffer
-                    // is full
-                    if (flush(true, coyoteResponse.getWriteListener() == 
null)) {
-                        break;
-                    }
+        public int doWrite(ByteChunk chunk) throws IOException {
+            writeLock.lock();
+            try {
+                if (closed) {
+                    throw new 
IllegalStateException(sm.getString("stream.closed", getConnectionId(), 
getIdentifier()));
                 }
-            }
-            written += offset;
-            return offset;
-        }
-
-        @Override
-        public final synchronized int doWrite(ByteBuffer chunk) throws 
IOException {
-            if (closed) {
-                throw new IOException(sm.getString("stream.closed", 
getConnectionId(), getIdAsString()));
-            }
-            int totalThisTime = 0;
-            if (writeBuffer.isEmpty()) {
-                int chunkLimit = chunk.limit();
-                while (chunk.remaining() > 0) {
-                    int thisTime = Math.min(buffer.remaining(), 
chunk.remaining());
-                    chunk.limit(chunk.position() + thisTime);
-                    buffer.put(chunk);
-                    chunk.limit(chunkLimit);
-                    totalThisTime += thisTime;
-                    if (chunk.remaining() > 0 && !buffer.hasRemaining()) {
+                if (!coyoteResponse.isCommitted()) {
+                    coyoteResponse.sendHeaders();
+                }
+                int len = chunk.getLength();
+                int offset = 0;
+                while (len > 0) {
+                    int thisTime = Math.min(buffer.remaining(), len);
+                    buffer.put(chunk.getBytes(), chunk.getOffset() + offset, 
thisTime);
+                    offset += thisTime;
+                    len -= thisTime;
+                    if (len > 0 && !buffer.hasRemaining()) {
                         // Only flush if we have more data to write and the 
buffer
                         // is full
                         if (flush(true, coyoteResponse.getWriteListener() == 
null)) {
-                            totalThisTime = chunk.remaining();
-                            writeBuffer.add(chunk);
-                            dataLeft = true;
                             break;
                         }
                     }
                 }
-            } else {
-                totalThisTime = chunk.remaining();
-                writeBuffer.add(chunk);
+                written += offset;
+                return offset;
+            } finally {
+                writeLock.unlock();
             }
-            written += totalThisTime;
-            return totalThisTime;
         }
 
-        final synchronized boolean flush(boolean block) throws IOException {
-            /*
-             * Need to ensure that there is exactly one call to flush even 
when there is no data to write. Too few calls
-             * (i.e. zero) and the end of stream message is not sent for a 
completed asynchronous write. Too many calls
-             * and the end of stream message is sent too soon and trailer 
headers are not sent.
-             */
-            boolean dataInBuffer = buffer.position() > 0;
-            boolean flushed = false;
-
-            if (dataInBuffer) {
-                dataInBuffer = flush(false, block);
-                flushed = true;
-            }
-
-            if (dataInBuffer) {
-                dataLeft = true;
-            } else {
+        @Override
+        public final int doWrite(ByteBuffer chunk) throws IOException {
+            writeLock.lock();
+            try {
+                if (closed) {
+                    throw new IOException(sm.getString("stream.closed", 
getConnectionId(), getIdAsString()));
+                }
+                int totalThisTime = 0;
                 if (writeBuffer.isEmpty()) {
-                    // Both buffer and writeBuffer are empty.
-                    if (flushed) {
-                        dataLeft = false;
-                    } else {
-                        dataLeft = flush(false, block);
+                    int chunkLimit = chunk.limit();
+                    while (chunk.remaining() > 0) {
+                        int thisTime = Math.min(buffer.remaining(), 
chunk.remaining());
+                        chunk.limit(chunk.position() + thisTime);
+                        buffer.put(chunk);
+                        chunk.limit(chunkLimit);
+                        totalThisTime += thisTime;
+                        if (chunk.remaining() > 0 && !buffer.hasRemaining()) {
+                            // Only flush if we have more data to write and 
the buffer
+                            // is full
+                            if (flush(true, coyoteResponse.getWriteListener() 
== null)) {
+                                totalThisTime = chunk.remaining();
+                                writeBuffer.add(chunk);
+                                dataLeft = true;
+                                break;
+                            }
+                        }
                     }
                 } else {
-                    dataLeft = writeBuffer.write(this, block);
+                    totalThisTime = chunk.remaining();
+                    writeBuffer.add(chunk);
                 }
+                written += totalThisTime;
+                return totalThisTime;
+            } finally {
+                writeLock.unlock();
             }
-
-            return dataLeft;
         }
 
-        private synchronized boolean flush(boolean writeInProgress, boolean 
block) throws IOException {
-            if (log.isDebugEnabled()) {
-                log.debug(sm.getString("stream.outputBuffer.flush.debug", 
getConnectionId(), getIdAsString(),
-                        Integer.toString(buffer.position()), 
Boolean.toString(writeInProgress),
-                        Boolean.toString(closed)));
-            }
-            if (buffer.position() == 0) {
-                if (closed && !endOfStreamSent) {
-                    // Handling this special case here is simpler than trying
-                    // to modify the following code to handle it.
-                    handler.writeBody(Stream.this, buffer, 0, true);
+        final boolean flush(boolean block) throws IOException {
+            writeLock.lock();
+            try {
+                /*
+                 * Need to ensure that there is exactly one call to flush even 
when there is no data to write. Too few calls
+                 * (i.e. zero) and the end of stream message is not sent for a 
completed asynchronous write. Too many calls
+                 * and the end of stream message is sent too soon and trailer 
headers are not sent.
+                 */
+                boolean dataInBuffer = buffer.position() > 0;
+                boolean flushed = false;
+
+                if (dataInBuffer) {
+                    dataInBuffer = flush(false, block);
+                    flushed = true;
                 }
-                // Buffer is empty. Nothing to do.
-                return false;
+
+                if (dataInBuffer) {
+                    dataLeft = true;
+                } else {
+                    if (writeBuffer.isEmpty()) {
+                        // Both buffer and writeBuffer are empty.
+                        if (flushed) {
+                            dataLeft = false;
+                        } else {
+                            dataLeft = flush(false, block);
+                        }
+                    } else {
+                        dataLeft = writeBuffer.write(this, block);
+                    }
+                }
+
+                return dataLeft;
+            } finally {
+                writeLock.unlock();
             }
-            buffer.flip();
-            int left = buffer.remaining();
-            while (left > 0) {
-                if (streamReservation == 0) {
-                    streamReservation = reserveWindowSize(left, block);
-                    if (streamReservation == 0) {
-                        // Must be non-blocking.
-                        // Note: Can't add to the writeBuffer here as the write
-                        // may originate from the writeBuffer.
-                        buffer.compact();
-                        return true;
+        }
+
+        private boolean flush(boolean writeInProgress, boolean block) throws 
IOException {
+            writeLock.lock();
+            try {
+                if (log.isDebugEnabled()) {
+                    log.debug(sm.getString("stream.outputBuffer.flush.debug", 
getConnectionId(), getIdAsString(),
+                            Integer.toString(buffer.position()), 
Boolean.toString(writeInProgress),
+                            Boolean.toString(closed)));
+                }
+                if (buffer.position() == 0) {
+                    if (closed && !endOfStreamSent) {
+                        // Handling this special case here is simpler than 
trying
+                        // to modify the following code to handle it.
+                        handler.writeBody(Stream.this, buffer, 0, true);
                     }
+                    // Buffer is empty. Nothing to do.
+                    return false;
                 }
-                while (streamReservation > 0) {
-                    int connectionReservation = 
handler.reserveWindowSize(Stream.this, streamReservation, block);
-                    if (connectionReservation == 0) {
-                        // Must be non-blocking.
-                        // Note: Can't add to the writeBuffer here as the write
-                        // may originate from the writeBuffer.
-                        buffer.compact();
-                        return true;
+                buffer.flip();
+                int left = buffer.remaining();
+                while (left > 0) {
+                    if (streamReservation == 0) {
+                        streamReservation = reserveWindowSize(left, block);
+                        if (streamReservation == 0) {
+                            // Must be non-blocking.
+                            // Note: Can't add to the writeBuffer here as the 
write
+                            // may originate from the writeBuffer.
+                            buffer.compact();
+                            return true;
+                        }
+                    }
+                    while (streamReservation > 0) {
+                        int connectionReservation = 
handler.reserveWindowSize(Stream.this, streamReservation, block);
+                        if (connectionReservation == 0) {
+                            // Must be non-blocking.
+                            // Note: Can't add to the writeBuffer here as the 
write
+                            // may originate from the writeBuffer.
+                            buffer.compact();
+                            return true;
+                        }
+                        // Do the write
+                        handler.writeBody(Stream.this, buffer, 
connectionReservation,
+                                !writeInProgress && closed && left == 
connectionReservation);
+                        streamReservation -= connectionReservation;
+                        left -= connectionReservation;
                     }
-                    // Do the write
-                    handler.writeBody(Stream.this, buffer, 
connectionReservation,
-                            !writeInProgress && closed && left == 
connectionReservation);
-                    streamReservation -= connectionReservation;
-                    left -= connectionReservation;
                 }
+                buffer.clear();
+                return false;
+            } finally {
+                writeLock.unlock();
             }
-            buffer.clear();
-            return false;
         }
 
-        final synchronized boolean isReady() {
-            // Bug 63682
-            // Only want to return false if the window size is zero AND we are
-            // already waiting for an allocation.
-            if (getWindowSize() > 0 && allocationManager.isWaitingForStream() 
||
-                    handler.getWindowSize() > 0 && 
allocationManager.isWaitingForConnection() || dataLeft) {
-                return false;
-            } else {
-                return true;
+        final boolean isReady() {
+            writeLock.lock();
+            try {
+                // Bug 63682
+                // Only want to return false if the window size is zero AND we 
are
+                // already waiting for an allocation.
+                if (getWindowSize() > 0 && 
allocationManager.isWaitingForStream() ||
+                        handler.getWindowSize() > 0 && 
allocationManager.isWaitingForConnection() || dataLeft) {
+                    return false;
+                } else {
+                    return true;
+                }
+            } finally {
+                writeLock.unlock();
             }
         }
 
@@ -1076,18 +1104,23 @@ class Stream extends AbstractNonZeroStream implements 
HeaderEmitter {
         }
 
         @Override
-        public synchronized boolean writeFromBuffer(ByteBuffer src, boolean 
blocking) throws IOException {
-            int chunkLimit = src.limit();
-            while (src.remaining() > 0) {
-                int thisTime = Math.min(buffer.remaining(), src.remaining());
-                src.limit(src.position() + thisTime);
-                buffer.put(src);
-                src.limit(chunkLimit);
-                if (flush(false, blocking)) {
-                    return true;
+        public boolean writeFromBuffer(ByteBuffer src, boolean blocking) 
throws IOException {
+            writeLock.lock();
+            try {
+                int chunkLimit = src.limit();
+                while (src.remaining() > 0) {
+                    int thisTime = Math.min(buffer.remaining(), 
src.remaining());
+                    src.limit(src.position() + thisTime);
+                    buffer.put(src);
+                    src.limit(chunkLimit);
+                    if (flush(false, blocking)) {
+                        return true;
+                    }
                 }
+                return false;
+            } finally {
+                writeLock.unlock();
             }
-            return false;
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to