This is an automated email from the ASF dual-hosted git repository. markt pushed a commit to branch 8.5.x in repository https://gitbox.apache.org/repos/asf/tomcat.git
commit fe31cb697a030f8fb703262082dc983150e684f5 Author: Mark Thomas <ma...@apache.org> AuthorDate: Thu Sep 24 18:11:37 2020 +0100 Reduce memory footprint of closed http/2 streams This refactoring replaces closed streams with a new RecycledStream object and changes the mechanism used to look up known streams. Refactoring getStream to handle differences between Stream and RecycledStream --- .../apache/coyote/http2/AbstractNonZeroStream.java | 2 + .../apache/coyote/http2/Http2UpgradeHandler.java | 131 ++++++++++++++------- java/org/apache/coyote/http2/RecycledStream.java | 26 +++- java/org/apache/coyote/http2/Stream.java | 22 ++-- 4 files changed, 116 insertions(+), 65 deletions(-) diff --git a/java/org/apache/coyote/http2/AbstractNonZeroStream.java b/java/org/apache/coyote/http2/AbstractNonZeroStream.java index d1eb38a..e1764a1 100644 --- a/java/org/apache/coyote/http2/AbstractNonZeroStream.java +++ b/java/org/apache/coyote/http2/AbstractNonZeroStream.java @@ -98,4 +98,6 @@ abstract class AbstractNonZeroStream extends AbstractStream { } abstract boolean isClosedFinal(); + + abstract void checkState(FrameType frameType) throws Http2Exception; } diff --git a/java/org/apache/coyote/http2/Http2UpgradeHandler.java b/java/org/apache/coyote/http2/Http2UpgradeHandler.java index 4e15e9d..751637c 100644 --- a/java/org/apache/coyote/http2/Http2UpgradeHandler.java +++ b/java/org/apache/coyote/http2/Http2UpgradeHandler.java @@ -23,7 +23,6 @@ import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; -import java.util.Map; import java.util.Map.Entry; import java.util.Queue; import java.util.Set; @@ -127,8 +126,8 @@ public class Http2UpgradeHandler extends AbstractStream implements InternalHttpU private long keepAliveTimeout = Http2Protocol.DEFAULT_KEEP_ALIVE_TIMEOUT; private long writeTimeout = Http2Protocol.DEFAULT_WRITE_TIMEOUT; - private final Map<Integer,Stream> streams = new ConcurrentHashMap<>(); - private final AtomicInteger activeRemoteStreamCount = new AtomicInteger(0); + private final ConcurrentMap<Integer,AbstractNonZeroStream> streams = new ConcurrentHashMap<>(); + protected final AtomicInteger activeRemoteStreamCount = new AtomicInteger(0); // Start at -1 so the 'add 2' logic in closeIdleStreams() works private volatile int maxActiveRemoteStreamId = -1; private volatile int maxProcessedStreamId; @@ -1082,7 +1081,22 @@ public class Http2UpgradeHandler extends AbstractStream implements InternalHttpU private Stream getStream(int streamId, boolean unknownIsError) throws ConnectionException { Integer key = Integer.valueOf(streamId); - Stream result = streams.get(key); + AbstractStream result = streams.get(key); + if (result instanceof Stream) { + return (Stream) result; + } + if (unknownIsError) { + // Stream has been closed and removed from the map + throw new ConnectionException(sm.getString("upgradeHandler.stream.closed", key.toString()), + Http2Error.PROTOCOL_ERROR); + } + return null; + } + + + private AbstractNonZeroStream getStreamMayBeClosed(int streamId, boolean unknownIsError) throws ConnectionException { + Integer key = Integer.valueOf(streamId); + AbstractNonZeroStream result = streams.get(key); if (result == null && unknownIsError) { // Stream has been closed and removed from the map throw new ConnectionException(sm.getString("upgradeHandler.stream.closed", key.toString()), @@ -1126,10 +1140,12 @@ public class Http2UpgradeHandler extends AbstractStream implements InternalHttpU return; } - for (Stream stream : streams.values()) { - // The connection is closing. Close the associated streams as no - // longer required (also notifies any threads waiting for allocations). - stream.receiveReset(Http2Error.CANCEL.getCode()); + for (AbstractNonZeroStream stream : streams.values()) { + if (stream instanceof Stream) { + // The connection is closing. Close the associated streams as no + // longer required (also notifies any threads waiting for allocations). + ((Stream) stream).receiveReset(Http2Error.CANCEL.getCode()); + } } try { socketWrapper.close(); @@ -1185,10 +1201,10 @@ public class Http2UpgradeHandler extends AbstractStream implements InternalHttpU TreeSet<Integer> candidatesStepTwo = new TreeSet<>(); TreeSet<Integer> candidatesStepThree = new TreeSet<>(); - for (Entry<Integer, Stream> entry : streams.entrySet()) { - Stream stream = entry.getValue(); + for (Entry<Integer, AbstractNonZeroStream> entry : streams.entrySet()) { + AbstractNonZeroStream stream = entry.getValue(); // Never remove active streams - if (stream.isActive()) { + if (stream instanceof Stream && ((Stream) stream).isActive()) { continue; } @@ -1208,7 +1224,7 @@ public class Http2UpgradeHandler extends AbstractStream implements InternalHttpU // Process the step one list for (Integer streamIdToRemove : candidatesStepOne) { // Remove this childless stream - Stream removedStream = streams.remove(streamIdToRemove); + AbstractNonZeroStream removedStream = streams.remove(streamIdToRemove); removedStream.detachFromParent(); toClose--; if (log.isDebugEnabled()) { @@ -1257,7 +1273,7 @@ public class Http2UpgradeHandler extends AbstractStream implements InternalHttpU private void removeStreamFromPriorityTree(Integer streamIdToRemove) { - Stream streamToRemove = streams.remove(streamIdToRemove); + AbstractNonZeroStream streamToRemove = streams.remove(streamIdToRemove); // Move the removed Stream's children to the removed Stream's // parent. Set<AbstractNonZeroStream> children = streamToRemove.getChildStreams(); @@ -1522,24 +1538,33 @@ public class Http2UpgradeHandler extends AbstractStream implements InternalHttpU } } - Stream stream = getStream(streamId, true); - stream.checkState(FrameType.DATA); - stream.receivedData(payloadSize); - return stream.getInputByteBuffer(); + AbstractNonZeroStream abstractNonZeroStream = getStreamMayBeClosed(streamId, true); + if (abstractNonZeroStream instanceof Stream) { + Stream stream = (Stream) abstractNonZeroStream; + stream.checkState(FrameType.DATA); + stream.receivedData(payloadSize); + return stream.getInputByteBuffer(); + } else { + abstractNonZeroStream.checkState(FrameType.DATA); + return null; + } } @Override public void endRequestBodyFrame(int streamId) throws Http2Exception { - Stream stream = getStream(streamId, true); - stream.getInputBuffer().onDataAvailable(); + AbstractNonZeroStream abstractNonZeroStream = getStreamMayBeClosed(streamId, true); + if (abstractNonZeroStream instanceof Stream) { + ((Stream) abstractNonZeroStream).getInputBuffer().onDataAvailable(); + } } @Override public void receivedEndOfStream(int streamId) throws ConnectionException { - Stream stream = getStream(streamId, connectionState.get().isNewStreamAllowed()); - if (stream != null) { + AbstractNonZeroStream abstractNonZeroStream = getStreamMayBeClosed(streamId, connectionState.get().isNewStreamAllowed()); + if (abstractNonZeroStream instanceof Stream) { + Stream stream = (Stream) abstractNonZeroStream; stream.receivedEndOfStream(); if (!stream.isActive()) { setConnectionTimeoutForStreamCount(activeRemoteStreamCount.decrementAndGet()); @@ -1551,9 +1576,11 @@ public class Http2UpgradeHandler extends AbstractStream implements InternalHttpU @Override public void swallowedPadding(int streamId, int paddingLength) throws ConnectionException, IOException { - Stream stream = getStream(streamId, true); - // +1 is for the payload byte used to define the padding length - writeWindowUpdate(stream, paddingLength + 1, false); + AbstractNonZeroStream abstractNonZeroStream = getStreamMayBeClosed(streamId, true); + if (abstractNonZeroStream instanceof Stream) { + // +1 is for the payload byte used to define the padding length + writeWindowUpdate((Stream) abstractNonZeroStream, paddingLength + 1, false); + } } @@ -1592,10 +1619,12 @@ public class Http2UpgradeHandler extends AbstractStream implements InternalHttpU private void closeIdleStreams(int newMaxActiveRemoteStreamId) { - for (Entry<Integer,Stream> entry : streams.entrySet()) { - if (entry.getKey().intValue() > maxActiveRemoteStreamId && - entry.getKey().intValue() < newMaxActiveRemoteStreamId) { - entry.getValue().closeIfIdle(); + for (Entry<Integer,AbstractNonZeroStream> entry : streams.entrySet()) { + int id = entry.getKey().intValue(); + if (id > maxActiveRemoteStreamId && id < newMaxActiveRemoteStreamId) { + if (entry.getValue() instanceof Stream) { + ((Stream) entry.getValue()).closeIfIdle(); + } } } maxActiveRemoteStreamId = newMaxActiveRemoteStreamId; @@ -1612,16 +1641,15 @@ public class Http2UpgradeHandler extends AbstractStream implements InternalHttpU increaseOverheadCount(); - Stream stream = getStream(streamId, false); - if (stream == null) { - stream = createRemoteStream(streamId); + AbstractNonZeroStream abstractNonZeroStream = getStreamMayBeClosed(streamId, false); + if (abstractNonZeroStream == null) { + abstractNonZeroStream = createRemoteStream(streamId); } - stream.checkState(FrameType.PRIORITY); - AbstractStream parentStream = getStream(parentStreamId, false); + AbstractStream parentStream = getStreamMayBeClosed(parentStreamId, false); if (parentStream == null) { parentStream = this; } - stream.rePrioritise(parentStream, exclusive, weight); + abstractNonZeroStream.rePrioritise(parentStream, exclusive, weight); } @@ -1646,9 +1674,10 @@ public class Http2UpgradeHandler extends AbstractStream implements InternalHttpU @Override public void headersEnd(int streamId) throws Http2Exception { - Stream stream = getStream(streamId, connectionState.get().isNewStreamAllowed()); - if (stream != null) { + AbstractNonZeroStream abstractNonZeroStream = getStreamMayBeClosed(streamId, connectionState.get().isNewStreamAllowed()); + if (abstractNonZeroStream instanceof Stream) { setMaxProcessedStream(streamId); + Stream stream = (Stream) abstractNonZeroStream; if (stream.isActive()) { if (stream.receivedEndOfHeaders()) { @@ -1679,12 +1708,19 @@ public class Http2UpgradeHandler extends AbstractStream implements InternalHttpU @Override public void reset(int streamId, long errorCode) throws Http2Exception { - Stream stream = getStream(streamId, true); - boolean active = stream.isActive(); - stream.checkState(FrameType.RST); - stream.receiveReset(errorCode); - if (active) { - activeRemoteStreamCount.decrementAndGet(); + if (log.isDebugEnabled()) { + log.debug(sm.getString("upgradeHandler.reset.receive", getConnectionId(), Integer.toString(streamId), + Long.toString(errorCode))); + } + AbstractNonZeroStream abstractNonZeroStream = getStreamMayBeClosed(streamId, true); + abstractNonZeroStream.checkState(FrameType.RST); + if (abstractNonZeroStream instanceof Stream) { + Stream stream = (Stream) abstractNonZeroStream; + boolean active = stream.isActive(); + stream.receiveReset(errorCode); + if (active) { + activeRemoteStreamCount.decrementAndGet(); + } } } @@ -1705,11 +1741,11 @@ public class Http2UpgradeHandler extends AbstractStream implements InternalHttpU // Do this first in case new value is invalid remoteSettings.set(setting, value); int diff = (int) (value - oldValue); - for (Stream stream : streams.values()) { + for (AbstractNonZeroStream stream : streams.values()) { try { stream.incrementWindowSize(diff); } catch (Http2Exception h2e) { - stream.close(new StreamException(sm.getString( + ((Stream) stream).close(new StreamException(sm.getString( "upgradeHandler.windowSizeTooBig", connectionId, stream.getIdAsString()), h2e.getError(), stream.getIdAsInt())); @@ -1782,7 +1818,7 @@ public class Http2UpgradeHandler extends AbstractStream implements InternalHttpU incrementWindowSize(increment); } else { - Stream stream = getStream(streamId, true); + AbstractNonZeroStream stream = getStreamMayBeClosed(streamId, true); // Check for small increments which are inefficient if (average < overheadThreshold) { @@ -1808,6 +1844,11 @@ public class Http2UpgradeHandler extends AbstractStream implements InternalHttpU } + void replaceStream(AbstractNonZeroStream original, AbstractNonZeroStream replacement) { + streams.replace(original.getIdentifier(), replacement); + } + + private class PingManager { protected boolean initiateDisabled = false; diff --git a/java/org/apache/coyote/http2/RecycledStream.java b/java/org/apache/coyote/http2/RecycledStream.java index c17ae02..7d408ea 100644 --- a/java/org/apache/coyote/http2/RecycledStream.java +++ b/java/org/apache/coyote/http2/RecycledStream.java @@ -23,12 +23,12 @@ package org.apache.coyote.http2; class RecycledStream extends AbstractNonZeroStream { private final String connectionId; - private final boolean closedFinal; + private final StreamStateMachine state; - RecycledStream(Stream stream) { - super(stream.getIdentifier(), stream.getWeight()); - connectionId = stream.getConnectionId(); - closedFinal = stream.isClosedFinal(); + RecycledStream(String connectionId, Integer identifier, int weight, StreamStateMachine state) { + super(identifier, weight); + this.connectionId = connectionId; + this.state = state; } @@ -40,7 +40,20 @@ class RecycledStream extends AbstractNonZeroStream { @Override boolean isClosedFinal() { - return closedFinal; + return state.isClosedFinal(); + } + + + @Override + final void checkState(FrameType frameType) throws Http2Exception { + state.checkFrameType(frameType); + } + + + @SuppressWarnings("sync-override") + @Override + protected void incrementWindowSize(int increment) throws Http2Exception { + // NO-OP } @@ -50,3 +63,4 @@ class RecycledStream extends AbstractNonZeroStream { // NO-OP. Unused. } } + diff --git a/java/org/apache/coyote/http2/Stream.java b/java/org/apache/coyote/http2/Stream.java index 5f3543f..78a288c 100644 --- a/java/org/apache/coyote/http2/Stream.java +++ b/java/org/apache/coyote/http2/Stream.java @@ -191,7 +191,8 @@ public class Stream extends AbstractNonZeroStream implements HeaderEmitter { } - void checkState(FrameType frameType) throws Http2Exception { + @Override + final void checkState(FrameType frameType) throws Http2Exception { state.checkFrameType(frameType); } @@ -658,24 +659,17 @@ public class Stream extends AbstractNonZeroStream implements HeaderEmitter { /* * This method is called recycle for consistency with the rest of the Tomcat - * code base. Currently, it only sets references to null for the purposes of - * reducing memory footprint. It does not fully recycle the Stream ready for - * re-use since Stream objects are not re-used. This is useful because - * Stream instances are retained for a period after the Stream closes. + * code base. Currently, it calls the handler to replace this stream with an + * implementation that uses less memory. It does not fully recycle the + * Stream ready for re-use since Stream objects are not re-used. This is + * useful because Stream instances are retained for a period after the + * Stream closes. */ final void recycle() { if (log.isDebugEnabled()) { log.debug(sm.getString("stream.recycle", getConnectionId(), getIdAsString())); } - /* - * Temporarily disabled due to multiple regressions (NPEs) - coyoteRequest = null; - cookieHeader = null; - coyoteResponse = null; - inputBuffer = null; - streamOutputBuffer = null; - http2OutputBuffer = null; - */ + handler.replaceStream(this, new RecycledStream(getConnectionId(), getIdentifier(), getWeight(), state)); } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org