This is an automated email from the ASF dual-hosted git repository.

markt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tomcat.git

commit dee5f2c1f744e789ab3a422de79385222d07ba6e
Author: Mark Thomas <ma...@apache.org>
AuthorDate: Wed Sep 15 14:12:26 2021 +0100

    Refactor allocations for the connection flow control window
    
    There are multiple related changes in this commit
    - The BackLog tracker object is removed and replaced with fields on
      AbstractStream.
    - If an incomplete allocation is made for a stream as a result of a
      WINDOW_UPDATE frame the remainder of the request is now discarded
    - The connection flow control window is, effectively, reduced as soon as
      an allocation is made rather than waiting until after the Stream is
      released. This is an improvement over the previous approach where over
      allocation was possible.
    - The loop in reserveWindowSize is removed
---
 java/org/apache/coyote/http2/AbstractStream.java   |  27 ++
 .../apache/coyote/http2/Http2UpgradeHandler.java   | 316 ++++++++-------------
 .../apache/coyote/http2/LocalStrings.properties    |   4 +-
 .../coyote/http2/WindowAllocationManager.java      |   3 +-
 webapps/docs/changelog.xml                         |   6 +
 5 files changed, 154 insertions(+), 202 deletions(-)

diff --git a/java/org/apache/coyote/http2/AbstractStream.java 
b/java/org/apache/coyote/http2/AbstractStream.java
index c7374b6..6a825c2 100644
--- a/java/org/apache/coyote/http2/AbstractStream.java
+++ b/java/org/apache/coyote/http2/AbstractStream.java
@@ -40,6 +40,9 @@ abstract class AbstractStream {
     private final Set<AbstractNonZeroStream> childStreams = 
Collections.newSetFromMap(new ConcurrentHashMap<>());
     private long windowSize = 
ConnectionSettingsBase.DEFAULT_INITIAL_WINDOW_SIZE;
 
+    private volatile int connectionAllocationRequested = 0;
+    private volatile int connectionAllocationMade = 0;
+
 
     AbstractStream(Integer identifier) {
         this.identifier = identifier;
@@ -154,6 +157,30 @@ abstract class AbstractStream {
     }
 
 
+    final int getConnectionAllocationRequested() {
+        return connectionAllocationRequested;
+    }
+
+
+    final void setConnectionAllocationRequested(int 
connectionAllocationRequested) {
+        
log.debug(sm.getString("abstractStream.setConnectionAllocationRequested", 
getConnectionId(), getIdAsString(),
+                Integer.toString(this.connectionAllocationRequested), 
Integer.toString(connectionAllocationRequested)));
+        this.connectionAllocationRequested = connectionAllocationRequested;
+    }
+
+
+    final int getConnectionAllocationMade() {
+        return connectionAllocationMade;
+    }
+
+
+    final void setConnectionAllocationMade(int connectionAllocationMade) {
+        log.debug(sm.getString("abstractStream.setConnectionAllocationMade", 
getConnectionId(), getIdAsString(),
+                Integer.toString(this.connectionAllocationMade), 
Integer.toString(connectionAllocationMade)));
+        this.connectionAllocationMade = connectionAllocationMade;
+    }
+
+
     abstract String getConnectionId();
 
     abstract int getWeight();
diff --git a/java/org/apache/coyote/http2/Http2UpgradeHandler.java 
b/java/org/apache/coyote/http2/Http2UpgradeHandler.java
index 045337a..529b4f7 100644
--- a/java/org/apache/coyote/http2/Http2UpgradeHandler.java
+++ b/java/org/apache/coyote/http2/Http2UpgradeHandler.java
@@ -20,10 +20,9 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Queue;
 import java.util.Set;
 import java.util.TreeSet;
@@ -132,7 +131,7 @@ class Http2UpgradeHandler extends AbstractStream implements 
InternalHttpUpgradeH
     private final AtomicInteger nextLocalStreamId = new AtomicInteger(2);
     private final PingManager pingManager = getPingManager();
     private volatile int newStreamsSinceLastPrune = 0;
-    private final Map<AbstractStream, BacklogTracker> backLogStreams = new 
ConcurrentHashMap<>();
+    private final Set<AbstractStream> backLogStreams = 
Collections.newSetFromMap(new ConcurrentHashMap<>());
     private long backLogSize = 0;
     // The time at which the connection will timeout unless data arrives before
     // then. -1 means no timeout.
@@ -873,101 +872,81 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
         // this thread until after this thread enters wait()
         int allocation = 0;
         synchronized (stream) {
-            do {
-                synchronized (this) {
-                    if (!stream.canWrite()) {
-                        
stream.doStreamCancel(sm.getString("upgradeHandler.stream.notWritable",
-                                stream.getConnectionId(), 
stream.getIdAsString()), Http2Error.STREAM_CLOSED);
-                    }
-                    long windowSize = getWindowSize();
-                    if (windowSize < 1 || backLogSize > 0) {
-                        // Has this stream been granted an allocation
-                        BacklogTracker tracker = backLogStreams.get(stream);
-                        if (tracker == null) {
-                            tracker = new BacklogTracker(reservation);
-                            backLogStreams.put(stream, tracker);
-                            backLogSize += reservation;
-                            // Add the parents as well
-                            AbstractStream parent = stream.getParentStream();
-                            while (parent != null && 
backLogStreams.putIfAbsent(parent, new BacklogTracker()) == null) {
-                                parent = parent.getParentStream();
-                            }
-                        } else {
-                            if (tracker.getUnusedAllocation() > 0) {
-                                allocation = tracker.getUnusedAllocation();
-                                decrementWindowSize(allocation);
-                                if (tracker.getRemainingReservation() == 0) {
-                                    // The reservation has been fully allocated
-                                    // so this stream can be removed from the
-                                    // backlog.
-                                    backLogStreams.remove(stream);
-                                } else {
-                                    // This allocation has been used. Leave the
-                                    // stream on the backlog as it still has
-                                    // more bytes to write.
-                                    tracker.useAllocation();
-                                }
-                            }
+            synchronized (this) {
+                if (!stream.canWrite()) {
+                    
stream.doStreamCancel(sm.getString("upgradeHandler.stream.notWritable",
+                            stream.getConnectionId(), stream.getIdAsString()), 
Http2Error.STREAM_CLOSED);
+                }
+                long windowSize = getWindowSize();
+                if (stream.getConnectionAllocationMade() > 0) {
+                    allocation = stream.getConnectionAllocationMade();
+                    stream.setConnectionAllocationMade(0);
+                } else  if (windowSize < 1) {
+                    // Has this stream been granted an allocation
+                    if (stream.getConnectionAllocationMade() == 0) {
+                        stream.setConnectionAllocationRequested(reservation);
+                        backLogSize += reservation;
+                        backLogStreams.add(stream);
+                        // Add the parents as well
+                        AbstractStream parent = stream.getParentStream();
+                        while (parent != null && backLogStreams.add(parent)) {
+                            parent = parent.getParentStream();
                         }
-                    } else if (windowSize < reservation) {
-                        allocation = (int) windowSize;
-                        decrementWindowSize(allocation);
-                    } else {
-                        allocation = reservation;
-                        decrementWindowSize(allocation);
                     }
+                } else if (windowSize < reservation) {
+                    allocation = (int) windowSize;
+                    decrementWindowSize(allocation);
+                } else {
+                    allocation = reservation;
+                    decrementWindowSize(allocation);
                 }
-                if (allocation == 0) {
-                    if (block) {
-                        try {
-                            // Connection level window is empty. Although this
-                            // request is for a stream, use the connection
-                            // timeout
-                            long writeTimeout = protocol.getWriteTimeout();
-                            stream.waitForConnectionAllocation(writeTimeout);
-                            // Has this stream been granted an allocation
-                            // Note: If the stream in not in this Map then the
-                            //       requested write has been fully allocated
-                            BacklogTracker tracker;
-                            // Ensure allocations made in other threads are 
visible
-                            synchronized (this) {
-                                tracker = backLogStreams.get(stream);
-                            }
-                            if (tracker != null && 
tracker.getUnusedAllocation() == 0) {
-                                String msg;
-                                Http2Error error;
-                                if (stream.isActive()) {
-                                    if (log.isDebugEnabled()) {
-                                        
log.debug(sm.getString("upgradeHandler.noAllocation",
-                                                connectionId, 
stream.getIdAsString()));
-                                    }
-                                    // No allocation
-                                    // Close the connection. Do this first 
since
-                                    // closing the stream will raise an 
exception.
-                                    close();
-                                    msg = sm.getString("stream.writeTimeout");
-                                    error = Http2Error.ENHANCE_YOUR_CALM;
-                                } else {
-                                    msg = sm.getString("stream.clientCancel");
-                                    error = Http2Error.STREAM_CLOSED;
+            }
+            if (allocation == 0) {
+                if (block) {
+                    try {
+                        // Connection level window is empty. Although this
+                        // request is for a stream, use the connection
+                        // timeout
+                        long writeTimeout = protocol.getWriteTimeout();
+                        stream.waitForConnectionAllocation(writeTimeout);
+                        // Has this stream been granted an allocation
+                        if (stream.getConnectionAllocationMade() == 0) {
+                            String msg;
+                            Http2Error error;
+                            if (stream.isActive()) {
+                                if (log.isDebugEnabled()) {
+                                    
log.debug(sm.getString("upgradeHandler.noAllocation",
+                                            connectionId, 
stream.getIdAsString()));
                                 }
-                                // Close the stream
-                                // This thread is in application code so need
-                                // to signal to the application that the
-                                // stream is closing
-                                stream.doStreamCancel(msg, error);
+                                // No allocation
+                                // Close the connection. Do this first since
+                                // closing the stream will raise an exception.
+                                close();
+                                msg = sm.getString("stream.writeTimeout");
+                                error = Http2Error.ENHANCE_YOUR_CALM;
+                            } else {
+                                msg = sm.getString("stream.clientCancel");
+                                error = Http2Error.STREAM_CLOSED;
                             }
-                        } catch (InterruptedException e) {
-                            throw new IOException(sm.getString(
-                                    
"upgradeHandler.windowSizeReservationInterrupted", connectionId,
-                                    stream.getIdAsString(), 
Integer.toString(reservation)), e);
+                            // Close the stream
+                            // This thread is in application code so need
+                            // to signal to the application that the
+                            // stream is closing
+                            stream.doStreamCancel(msg, error);
+                        } else {
+                            allocation = stream.getConnectionAllocationMade();
+                            stream.setConnectionAllocationMade(0);
                         }
-                    } else {
-                        stream.waitForConnectionAllocationNonBlocking();
-                        return 0;
+                    } catch (InterruptedException e) {
+                        throw new IOException(sm.getString(
+                                
"upgradeHandler.windowSizeReservationInterrupted", connectionId,
+                                stream.getIdAsString(), 
Integer.toString(reservation)), e);
                     }
+                } else {
+                    stream.waitForConnectionAllocationNonBlocking();
+                    return 0;
                 }
-            } while (allocation == 0);
+            }
         }
         return allocation;
     }
@@ -983,19 +962,12 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
         synchronized (this) {
             long windowSize = getWindowSize();
             if (windowSize < 1 && windowSize + increment > 0) {
-                //  Connection window is completed exhausted. Assume there will
-                // be streams to notify. The overhead is minimal if there are
-                // none.
+                // Connection window is exhausted. Assume there will be streams
+                // to notify. The overhead is minimal if there are none.
                 streamsToNotify = releaseBackLog((int) (windowSize 
+increment));
-            } else if (backLogSize > 0) {
-                // While windowSize is greater than zero, all of it has already
-                // been allocated to streams in the backlog (or just about to
-                // exit the backlog). If any of windowSize was unallocated or
-                // 'spare', backLogSize would be zero. Therefore, apply this
-                // addition allocation to the backlog.
-                streamsToNotify = releaseBackLog(increment);
+            } else {
+                super.incrementWindowSize(increment);
             }
-            super.incrementWindowSize(increment);
         }
 
         if (streamsToNotify != null) {
@@ -1029,26 +1001,34 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
     }
 
 
-    private synchronized Set<AbstractStream> releaseBackLog(int increment) {
+    private synchronized Set<AbstractStream> releaseBackLog(int increment) 
throws Http2Exception {
         Set<AbstractStream> result = new HashSet<>();
-        if (backLogSize < increment) {
+        int remaining = increment;
+        if (backLogSize < remaining) {
             // Can clear the whole backlog
-            result.addAll(backLogStreams.keySet());
-            backLogStreams.clear();
+            for (AbstractStream stream : backLogStreams) {
+                if (stream.getConnectionAllocationRequested() > 0) {
+                    
stream.setConnectionAllocationMade(stream.getConnectionAllocationRequested());
+                    stream.setConnectionAllocationRequested(0);
+                }
+            }
+            remaining -= backLogSize;
             backLogSize = 0;
+            super.incrementWindowSize(remaining);
+
+            result.addAll(backLogStreams);
+            backLogStreams.clear();
         } else {
-            int leftToAllocate = increment;
-            while (leftToAllocate > 0) {
-                leftToAllocate = allocate(this, leftToAllocate);
-            }
-            for (Entry<AbstractStream,BacklogTracker> entry : 
backLogStreams.entrySet()) {
-                int allocation = entry.getValue().getUnusedAllocation();
-                if (allocation > 0) {
-                    backLogSize -= allocation;
-                    if (!entry.getValue().isNotifyInProgress()) {
-                        result.add(entry.getKey());
-                        entry.getValue().startNotify();
-                    }
+            allocate(this, remaining);
+            Iterator<AbstractStream> streamIter = backLogStreams.iterator();
+            while (streamIter.hasNext()) {
+                AbstractStream stream = streamIter.next();
+                if (stream.getConnectionAllocationMade() > 0) {
+                    backLogSize -= stream.getConnectionAllocationMade();
+                    backLogSize -= stream.getConnectionAllocationRequested();
+                    stream.setConnectionAllocationRequested(0);
+                    result.add(stream);
+                    streamIter.remove();
                 }
             }
         }
@@ -1061,10 +1041,20 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
             log.debug(sm.getString("upgradeHandler.allocate.debug", 
getConnectionId(),
                     stream.getIdAsString(), Integer.toString(allocation)));
         }
-        // Allocate to the specified stream
-        BacklogTracker tracker = backLogStreams.get(stream);
 
-        int leftToAllocate = tracker.allocate(allocation);
+        int leftToAllocate = allocation;
+
+        if (stream.getConnectionAllocationRequested() > 0) {
+            int allocatedThisTime;
+            if (allocation >= stream.getConnectionAllocationRequested()) {
+                allocatedThisTime = stream.getConnectionAllocationRequested();
+            } else {
+                allocatedThisTime = allocation;
+            }
+            
stream.setConnectionAllocationRequested(stream.getConnectionAllocationRequested()
 - allocatedThisTime);
+            
stream.setConnectionAllocationMade(stream.getConnectionAllocationMade() + 
allocatedThisTime);
+            leftToAllocate = leftToAllocate - allocatedThisTime;
+        }
 
         if (leftToAllocate == 0) {
             return 0;
@@ -1078,14 +1068,17 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
         // Recipients are children of the current stream that are in the
         // backlog.
         Set<AbstractStream> recipients = new 
HashSet<>(stream.getChildStreams());
-        recipients.retainAll(backLogStreams.keySet());
+        recipients.retainAll(backLogStreams);
 
         // Loop until we run out of allocation or recipients
         while (leftToAllocate > 0) {
             if (recipients.size() == 0) {
-                if (tracker.getUnusedAllocation() == 0) {
+                if (stream.getConnectionAllocationMade() == 0) {
                     backLogStreams.remove(stream);
                 }
+                if (stream.getIdAsInt() == 0) {
+                    throw new IllegalStateException();
+                }
                 return leftToAllocate;
             }
 
@@ -1832,8 +1825,7 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
             if (average < overheadThreshold) {
                 // For Streams, client might only release the minimum so check
                 // against current demand
-                BacklogTracker tracker = backLogStreams.get(stream);
-                if (tracker == null || increment < 
tracker.getRemainingReservation()) {
+                if (increment < stream.getConnectionAllocationRequested()) {
                     // The smaller the increment, the larger the overhead
                     increaseOverheadCount(FrameType.WINDOW_UPDATE, 
overheadThreshold / average);
                 }
@@ -2044,80 +2036,4 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
             payload = ByteBuffer.allocate(payload.capacity() * 2);
         }
     }
-
-
-    private static class BacklogTracker {
-
-        private int remainingReservation;
-        private int unusedAllocation;
-        private boolean notifyInProgress;
-
-        public BacklogTracker() {
-        }
-
-        public BacklogTracker(int reservation) {
-            remainingReservation = reservation;
-        }
-
-        /**
-         * @return The number of bytes requiring an allocation from the
-         *         Connection flow control window
-         */
-        public int getRemainingReservation() {
-            return remainingReservation;
-        }
-
-        /**
-         *
-         * @return The number of bytes allocated from the Connection flow
-         *         control window but not yet written
-         */
-        public int getUnusedAllocation() {
-            return unusedAllocation;
-        }
-
-        /**
-         * The purpose of this is to avoid the incorrect triggering of a 
timeout
-         * for the following sequence of events:
-         * <ol>
-         * <li>window update 1</li>
-         * <li>allocation 1</li>
-         * <li>notify 1</li>
-         * <li>window update 2</li>
-         * <li>allocation 2</li>
-         * <li>act on notify 1 (using allocation 1 and 2)</li>
-         * <li>notify 2</li>
-         * <li>act on notify 2 (timeout due to no allocation)</li>
-         * </ol>
-         *
-         * @return {@code true} if a notify has been issued but the associated
-         *         allocation has not been used, otherwise {@code false}
-         */
-        public boolean isNotifyInProgress() {
-            return notifyInProgress;
-        }
-
-        public void useAllocation() {
-            unusedAllocation = 0;
-            notifyInProgress = false;
-        }
-
-        public void startNotify() {
-            notifyInProgress = true;
-        }
-
-        private int allocate(int allocation) {
-            if (remainingReservation >= allocation) {
-                remainingReservation -= allocation;
-                unusedAllocation += allocation;
-                return 0;
-            }
-
-            int left = allocation - remainingReservation;
-            unusedAllocation += remainingReservation;
-            remainingReservation = 0;
-
-            return left;
-        }
-    }
 }
diff --git a/java/org/apache/coyote/http2/LocalStrings.properties 
b/java/org/apache/coyote/http2/LocalStrings.properties
index cf36ded..02f7abe 100644
--- a/java/org/apache/coyote/http2/LocalStrings.properties
+++ b/java/org/apache/coyote/http2/LocalStrings.properties
@@ -13,6 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+abstractStream.setConnectionAllocationRequested=Connection [{0}], Stream 
[{1}], connection allocation requested changed from [{2}] to [{3}]
+abstractStream.setConnectionAllocationMade=Connection [{0}], Stream [{1}], 
connection allocation made changed from [{2}] to [{3}]
 abstractStream.windowSizeDec=Connection [{0}], Stream [{1}], reduce flow 
control window by [{2}] to [{3}]
 abstractStream.windowSizeInc=Connection [{0}], Stream [{1}], increase flow 
control window by [{2}] to [{3}]
 abstractStream.windowSizeTooBig=Connection [{0}], Stream [{1}], increased 
window size by [{2}] to [{3}] which exceeded permitted maximum
@@ -169,7 +171,7 @@ upgradeHandler.writePushHeaders=Connection [{0}], Stream 
[{1}], Pushed stream [{
 windowAllocationManager.dispatched=Connection [{0}], Stream [{1}], Dispatched
 windowAllocationManager.notified=Connection [{0}], Stream [{1}], Notified
 windowAllocationManager.notify=Connection [{0}], Stream [{1}], Waiting type 
[{2}], Notify type [{3}]
-windowAllocationManager.waitFor.connection=Connection [{0}], Stream [{1}], 
Waiting for Connection flow control window (blocking) with timeout [{2}]
+windowAllocationManager.waitFor.connection=Connection [{0}], Stream [{1}], 
Waiting for [{2}] bytes from connection flow control window (blocking) with 
timeout [{3}]
 windowAllocationManager.waitFor.ise=Connection [{0}], Stream [{1}], Already 
waiting
 windowAllocationManager.waitFor.stream=Connection [{0}], Stream [{1}], Waiting 
for Stream flow control window (blocking) with timeout [{2}]
 windowAllocationManager.waitForNonBlocking.connection=Connection [{0}], Stream 
[{1}], Waiting for Connection flow control window (non-blocking)
diff --git a/java/org/apache/coyote/http2/WindowAllocationManager.java 
b/java/org/apache/coyote/http2/WindowAllocationManager.java
index 6c824a5..45ac2fd 100644
--- a/java/org/apache/coyote/http2/WindowAllocationManager.java
+++ b/java/org/apache/coyote/http2/WindowAllocationManager.java
@@ -78,7 +78,8 @@ class WindowAllocationManager {
     void waitForConnection(long timeout) throws InterruptedException {
         if (log.isDebugEnabled()) {
             
log.debug(sm.getString("windowAllocationManager.waitFor.connection",
-                    stream.getConnectionId(), stream.getIdAsString(), 
Long.toString(timeout)));
+                    stream.getConnectionId(), stream.getIdAsString(),
+                    
Integer.toString(stream.getConnectionAllocationRequested()), 
Long.toString(timeout)));
         }
 
         waitFor(CONNECTION, timeout);
diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml
index ad95582..ab5b181 100644
--- a/webapps/docs/changelog.xml
+++ b/webapps/docs/changelog.xml
@@ -125,6 +125,12 @@
         Correct a potential <code>StackOverflowException</code> with HTTP/2 and
         sendfile. (markt)
       </fix>
+      <fix>
+        Further improvements in the management of the connection flow control
+        window. This addresses various bugs that what cause streams to
+        incorrectly report that they had timed out waiting for an allocation
+        from the connection flow control window. (markt)
+      </fix>
     </changelog>
   </subsection>
   <subsection name="Jasper">

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to