This is an automated email from the ASF dual-hosted git repository.

markt pushed a commit to branch 9.0.x
in repository https://gitbox.apache.org/repos/asf/tomcat.git

commit 496b3cf5dd99179cd5460c728819bd799f662f99
Author: Mark Thomas <ma...@apache.org>
AuthorDate: Thu Sep 24 18:11:37 2020 +0100

    Reduce memory footprint of closed http/2 streams
    
    This refactoring replaces closed streams with a new RecycledStream
    object and changes the mechanism used to look up known streams.
    Refactoring getStream to handle differences between Stream and
    RecycledStream
---
 .../apache/coyote/http2/AbstractNonZeroStream.java |   2 +
 .../apache/coyote/http2/Http2UpgradeHandler.java   | 125 ++++++++++++++-------
 java/org/apache/coyote/http2/RecycledStream.java   |  26 ++++-
 java/org/apache/coyote/http2/Stream.java           |  20 ++--
 4 files changed, 111 insertions(+), 62 deletions(-)

diff --git a/java/org/apache/coyote/http2/AbstractNonZeroStream.java 
b/java/org/apache/coyote/http2/AbstractNonZeroStream.java
index 70cfe35..582ab1e 100644
--- a/java/org/apache/coyote/http2/AbstractNonZeroStream.java
+++ b/java/org/apache/coyote/http2/AbstractNonZeroStream.java
@@ -98,4 +98,6 @@ abstract class AbstractNonZeroStream extends AbstractStream {
     }
 
     abstract boolean isClosedFinal();
+
+    abstract void checkState(FrameType frameType) throws Http2Exception;
 }
diff --git a/java/org/apache/coyote/http2/Http2UpgradeHandler.java 
b/java/org/apache/coyote/http2/Http2UpgradeHandler.java
index ad5e0e4..c51efd9 100644
--- a/java/org/apache/coyote/http2/Http2UpgradeHandler.java
+++ b/java/org/apache/coyote/http2/Http2UpgradeHandler.java
@@ -29,6 +29,7 @@ import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -121,7 +122,7 @@ class Http2UpgradeHandler extends AbstractStream implements 
InternalHttpUpgradeH
     private HpackDecoder hpackDecoder;
     private HpackEncoder hpackEncoder;
 
-    private final Map<Integer,Stream> streams = new ConcurrentHashMap<>();
+    private final ConcurrentMap<Integer,AbstractNonZeroStream> streams = new 
ConcurrentHashMap<>();
     protected final AtomicInteger activeRemoteStreamCount = new 
AtomicInteger(0);
     // Start at -1 so the 'add 2' logic in closeIdleStreams() works
     private volatile int maxActiveRemoteStreamId = -1;
@@ -1089,7 +1090,22 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
 
     private Stream getStream(int streamId, boolean unknownIsError) throws 
ConnectionException {
         Integer key = Integer.valueOf(streamId);
-        Stream result = streams.get(key);
+        AbstractStream result = streams.get(key);
+        if (result instanceof Stream) {
+            return (Stream) result;
+        }
+        if (unknownIsError) {
+            // Stream has been closed and removed from the map
+            throw new 
ConnectionException(sm.getString("upgradeHandler.stream.closed", 
key.toString()),
+                    Http2Error.PROTOCOL_ERROR);
+        }
+        return null;
+    }
+
+
+    private AbstractNonZeroStream getStreamMayBeClosed(int streamId, boolean 
unknownIsError) throws ConnectionException {
+        Integer key = Integer.valueOf(streamId);
+        AbstractNonZeroStream result = streams.get(key);
         if (result == null && unknownIsError) {
             // Stream has been closed and removed from the map
             throw new 
ConnectionException(sm.getString("upgradeHandler.stream.closed", 
key.toString()),
@@ -1133,10 +1149,12 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
             return;
         }
 
-        for (Stream stream : streams.values()) {
-            // The connection is closing. Close the associated streams as no
-            // longer required (also notifies any threads waiting for 
allocations).
-            stream.receiveReset(Http2Error.CANCEL.getCode());
+        for (AbstractNonZeroStream stream : streams.values()) {
+            if (stream instanceof Stream) {
+                // The connection is closing. Close the associated streams as 
no
+                // longer required (also notifies any threads waiting for 
allocations).
+                ((Stream) stream).receiveReset(Http2Error.CANCEL.getCode());
+            }
         }
         try {
             socketWrapper.close();
@@ -1193,10 +1211,10 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
         TreeSet<Integer> candidatesStepTwo = new TreeSet<>();
         TreeSet<Integer> candidatesStepThree = new TreeSet<>();
 
-        for (Entry<Integer, Stream> entry : streams.entrySet()) {
-            Stream stream = entry.getValue();
+        for (Entry<Integer, AbstractNonZeroStream> entry : streams.entrySet()) 
{
+            AbstractNonZeroStream stream = entry.getValue();
             // Never remove active streams
-            if (stream.isActive()) {
+            if (stream instanceof Stream && ((Stream) stream).isActive()) {
                 continue;
             }
 
@@ -1217,7 +1235,7 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
         // Process the step one list
         for (Integer streamIdToRemove : candidatesStepOne) {
             // Remove this childless stream
-            Stream removedStream = streams.remove(streamIdToRemove);
+            AbstractNonZeroStream removedStream = 
streams.remove(streamIdToRemove);
             removedStream.detachFromParent();
             toClose--;
             if (log.isDebugEnabled()) {
@@ -1266,7 +1284,7 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
 
 
     private void removeStreamFromPriorityTree(Integer streamIdToRemove) {
-        Stream streamToRemove = streams.remove(streamIdToRemove);
+        AbstractNonZeroStream streamToRemove = 
streams.remove(streamIdToRemove);
         // Move the removed Stream's children to the removed Stream's
         // parent.
         Set<AbstractNonZeroStream> children = streamToRemove.getChildStreams();
@@ -1415,24 +1433,33 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
             }
         }
 
-        Stream stream = getStream(streamId, true);
-        stream.checkState(FrameType.DATA);
-        stream.receivedData(payloadSize);
-        return stream.getInputByteBuffer();
+        AbstractNonZeroStream abstractNonZeroStream = 
getStreamMayBeClosed(streamId, true);
+        if (abstractNonZeroStream instanceof Stream) {
+            Stream stream = (Stream) abstractNonZeroStream;
+            stream.checkState(FrameType.DATA);
+            stream.receivedData(payloadSize);
+            return stream.getInputByteBuffer();
+        } else {
+            abstractNonZeroStream.checkState(FrameType.DATA);
+            return null;
+        }
     }
 
 
     @Override
     public void endRequestBodyFrame(int streamId) throws Http2Exception {
-        Stream stream = getStream(streamId, true);
-        stream.getInputBuffer().onDataAvailable();
+        AbstractNonZeroStream abstractNonZeroStream = 
getStreamMayBeClosed(streamId, true);
+        if (abstractNonZeroStream instanceof Stream) {
+            ((Stream) 
abstractNonZeroStream).getInputBuffer().onDataAvailable();
+        }
     }
 
 
     @Override
     public void receivedEndOfStream(int streamId) throws ConnectionException {
-        Stream stream = getStream(streamId, 
connectionState.get().isNewStreamAllowed());
-        if (stream != null) {
+        AbstractNonZeroStream abstractNonZeroStream = 
getStreamMayBeClosed(streamId, connectionState.get().isNewStreamAllowed());
+        if (abstractNonZeroStream instanceof Stream) {
+            Stream stream = (Stream) abstractNonZeroStream;
             stream.receivedEndOfStream();
             if (!stream.isActive()) {
                 
setConnectionTimeoutForStreamCount(activeRemoteStreamCount.decrementAndGet());
@@ -1444,9 +1471,11 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
     @Override
     public void swallowedPadding(int streamId, int paddingLength) throws
             ConnectionException, IOException {
-        Stream stream = getStream(streamId, true);
-        // +1 is for the payload byte used to define the padding length
-        writeWindowUpdate(stream, paddingLength + 1, false);
+        AbstractNonZeroStream abstractNonZeroStream = 
getStreamMayBeClosed(streamId, true);
+        if (abstractNonZeroStream instanceof Stream) {
+            // +1 is for the payload byte used to define the padding length
+            writeWindowUpdate((Stream) abstractNonZeroStream, paddingLength + 
1, false);
+        }
     }
 
 
@@ -1485,10 +1514,12 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
 
 
     private void closeIdleStreams(int newMaxActiveRemoteStreamId) {
-        for (Entry<Integer,Stream> entry : streams.entrySet()) {
-            if (entry.getKey().intValue() > maxActiveRemoteStreamId &&
-                    entry.getKey().intValue() < newMaxActiveRemoteStreamId) {
-                entry.getValue().closeIfIdle();
+        for (Entry<Integer,AbstractNonZeroStream> entry : streams.entrySet()) {
+            int id = entry.getKey().intValue();
+            if (id > maxActiveRemoteStreamId && id < 
newMaxActiveRemoteStreamId) {
+                if (entry.getValue() instanceof Stream) {
+                    ((Stream) entry.getValue()).closeIfIdle();
+                }
             }
         }
         maxActiveRemoteStreamId = newMaxActiveRemoteStreamId;
@@ -1505,16 +1536,15 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
 
         increaseOverheadCount();
 
-        Stream stream = getStream(streamId, false);
-        if (stream == null) {
-            stream = createRemoteStream(streamId);
+        AbstractNonZeroStream abstractNonZeroStream = 
getStreamMayBeClosed(streamId, false);
+        if (abstractNonZeroStream == null) {
+            abstractNonZeroStream = createRemoteStream(streamId);
         }
-        stream.checkState(FrameType.PRIORITY);
-        AbstractStream parentStream = getStream(parentStreamId, false);
+        AbstractStream parentStream = getStreamMayBeClosed(parentStreamId, 
false);
         if (parentStream == null) {
             parentStream = this;
         }
-        stream.rePrioritise(parentStream, exclusive, weight);
+        abstractNonZeroStream.rePrioritise(parentStream, exclusive, weight);
     }
 
 
@@ -1539,9 +1569,10 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
 
     @Override
     public void headersEnd(int streamId) throws Http2Exception {
-        Stream stream = getStream(streamId, 
connectionState.get().isNewStreamAllowed());
-        if (stream != null) {
+        AbstractNonZeroStream abstractNonZeroStream = 
getStreamMayBeClosed(streamId, connectionState.get().isNewStreamAllowed());
+        if (abstractNonZeroStream instanceof Stream) {
             setMaxProcessedStream(streamId);
+            Stream stream = (Stream) abstractNonZeroStream;
             if (stream.isActive()) {
                 if (stream.receivedEndOfHeaders()) {
 
@@ -1576,12 +1607,15 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
             log.debug(sm.getString("upgradeHandler.reset.receive", 
getConnectionId(), Integer.toString(streamId),
                     Long.toString(errorCode)));
         }
-        Stream stream = getStream(streamId, true);
-        boolean active = stream.isActive();
-        stream.checkState(FrameType.RST);
-        stream.receiveReset(errorCode);
-        if (active) {
-            activeRemoteStreamCount.decrementAndGet();
+        AbstractNonZeroStream abstractNonZeroStream = 
getStreamMayBeClosed(streamId, true);
+        abstractNonZeroStream.checkState(FrameType.RST);
+        if (abstractNonZeroStream instanceof Stream) {
+            Stream stream = (Stream) abstractNonZeroStream;
+            boolean active = stream.isActive();
+            stream.receiveReset(errorCode);
+            if (active) {
+                activeRemoteStreamCount.decrementAndGet();
+            }
         }
     }
 
@@ -1602,11 +1636,11 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
             // Do this first in case new value is invalid
             remoteSettings.set(setting, value);
             int diff = (int) (value - oldValue);
-            for (Stream stream : streams.values()) {
+            for (AbstractNonZeroStream stream : streams.values()) {
                 try {
                     stream.incrementWindowSize(diff);
                 } catch (Http2Exception h2e) {
-                    stream.close(new StreamException(sm.getString(
+                    ((Stream) stream).close(new StreamException(sm.getString(
                             "upgradeHandler.windowSizeTooBig", connectionId,
                             stream.getIdAsString()),
                             h2e.getError(), stream.getIdAsInt()));
@@ -1679,7 +1713,7 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
 
             incrementWindowSize(increment);
         } else {
-            Stream stream = getStream(streamId, true);
+            AbstractNonZeroStream stream = getStreamMayBeClosed(streamId, 
true);
 
             // Check for small increments which are inefficient
             if (average < overheadThreshold) {
@@ -1705,6 +1739,11 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
     }
 
 
+    void replaceStream(AbstractNonZeroStream original, AbstractNonZeroStream 
replacement) {
+        streams.replace(original.getIdentifier(), replacement);
+    }
+
+
     protected class PingManager {
 
         protected boolean initiateDisabled = false;
diff --git a/java/org/apache/coyote/http2/RecycledStream.java 
b/java/org/apache/coyote/http2/RecycledStream.java
index dbbdc10..1915dff 100644
--- a/java/org/apache/coyote/http2/RecycledStream.java
+++ b/java/org/apache/coyote/http2/RecycledStream.java
@@ -23,12 +23,12 @@ package org.apache.coyote.http2;
 class RecycledStream extends AbstractNonZeroStream {
 
     private final String connectionId;
-    private final boolean closedFinal;
+    private final StreamStateMachine state;
 
-    RecycledStream(Stream stream) {
-        super(stream.getIdentifier(), stream.getWeight());
-        connectionId = stream.getConnectionId();
-        closedFinal = stream.isClosedFinal();
+    RecycledStream(String connectionId, Integer identifier, int weight, 
StreamStateMachine state) {
+        super(identifier, weight);
+        this.connectionId = connectionId;
+        this.state = state;
     }
 
 
@@ -40,6 +40,20 @@ class RecycledStream extends AbstractNonZeroStream {
 
     @Override
     boolean isClosedFinal() {
-        return closedFinal;
+        return state.isClosedFinal();
+    }
+
+
+    @Override
+    final void checkState(FrameType frameType) throws Http2Exception {
+        state.checkFrameType(frameType);
+    }
+
+
+    @SuppressWarnings("sync-override")
+    @Override
+    void incrementWindowSize(int increment) throws Http2Exception {
+        // NO-OP
     }
 }
+
diff --git a/java/org/apache/coyote/http2/Stream.java 
b/java/org/apache/coyote/http2/Stream.java
index 489ce53..8aa4a7b 100644
--- a/java/org/apache/coyote/http2/Stream.java
+++ b/java/org/apache/coyote/http2/Stream.java
@@ -193,6 +193,7 @@ class Stream extends AbstractNonZeroStream implements 
HeaderEmitter {
     }
 
 
+    @Override
     final void checkState(FrameType frameType) throws Http2Exception {
         state.checkFrameType(frameType);
     }
@@ -688,24 +689,17 @@ class Stream extends AbstractNonZeroStream implements 
HeaderEmitter {
 
     /*
      * This method is called recycle for consistency with the rest of the 
Tomcat
-     * code base. Currently, it only sets references to null for the purposes 
of
-     * reducing memory footprint. It does not fully recycle the Stream ready 
for
-     * re-use since Stream objects are not re-used. This is useful because
-     * Stream instances are retained for a period after the Stream closes.
+     * code base. Currently, it calls the handler to replace this stream with 
an
+     * implementation that uses less memory. It does not fully recycle the
+     * Stream ready for re-use since Stream objects are not re-used. This is
+     * useful because Stream instances are retained for a period after the
+     * Stream closes.
      */
     final void recycle() {
         if (log.isDebugEnabled()) {
             log.debug(sm.getString("stream.recycle", getConnectionId(), 
getIdAsString()));
         }
-        /*
-         * Temporarily disabled due to multiple regressions (NPEs)
-        coyoteRequest = null;
-        cookieHeader = null;
-        coyoteResponse = null;
-        inputBuffer = null;
-        streamOutputBuffer = null;
-        http2OutputBuffer = null;
-        */
+        handler.replaceStream(this, new RecycledStream(getConnectionId(), 
getIdentifier(), getWeight(), state));
     }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to