Author: markt
Date: Fri Aug 17 19:16:56 2018
New Revision: 1838275
URL: http://svn.apache.org/viewvc?rev=1838275&view=rev
Log:
Additional fixes for output corruption of response bodies when writing large
bodies using asynchronous processing over HTTP/2.
Added:
tomcat/trunk/test/org/apache/coyote/http2/TestAsync.java (with props)
Modified:
tomcat/trunk/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java
tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java
tomcat/trunk/java/org/apache/coyote/http2/Stream.java
tomcat/trunk/test/org/apache/coyote/http2/Http2TestBase.java
tomcat/trunk/webapps/docs/changelog.xml
Modified:
tomcat/trunk/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java?rev=1838275&r1=1838274&r2=1838275&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java
(original)
+++ tomcat/trunk/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java Fri
Aug 17 19:16:56 2018
@@ -280,7 +280,7 @@ public class Http2AsyncUpgradeHandler ex
// Reserve as much as possible right away
int reservation = (sendfile.end - sendfile.pos >
Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) (sendfile.end - sendfile.pos);
sendfile.streamReservation =
sendfile.stream.reserveWindowSize(reservation, true);
- sendfile.connectionReservation =
reserveWindowSize(sendfile.stream, sendfile.streamReservation);
+ sendfile.connectionReservation =
reserveWindowSize(sendfile.stream, sendfile.streamReservation, true);
} catch (IOException e) {
return SendfileState.ERROR;
}
@@ -340,7 +340,7 @@ public class Http2AsyncUpgradeHandler ex
int reservation = (sendfile.end - sendfile.pos >
Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) (sendfile.end - sendfile.pos);
sendfile.streamReservation =
sendfile.stream.reserveWindowSize(reservation, true);
}
- sendfile.connectionReservation =
reserveWindowSize(sendfile.stream, sendfile.streamReservation);
+ sendfile.connectionReservation =
reserveWindowSize(sendfile.stream, sendfile.streamReservation, true);
}
} catch (IOException e) {
failed (e, sendfile);
Modified: tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java?rev=1838275&r1=1838274&r2=1838275&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java
(original)
+++ tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java Fri Aug
17 19:16:56 2018
@@ -723,7 +723,7 @@ class Http2UpgradeHandler extends Abstra
}
- int reserveWindowSize(Stream stream, int reservation) throws IOException {
+ int reserveWindowSize(Stream stream, int reservation, boolean block)
throws IOException {
// Need to be holding the stream lock so releaseBacklog() can't notify
// this thread until after this thread enters wait()
int allocation = 0;
@@ -775,12 +775,16 @@ class Http2UpgradeHandler extends Abstra
}
}
if (allocation == 0) {
- try {
- stream.wait();
- } catch (InterruptedException e) {
- throw new IOException(sm.getString(
-
"upgradeHandler.windowSizeReservationInterrupted", connectionId,
- stream.getIdentifier(),
Integer.toString(reservation)), e);
+ if (block) {
+ try {
+ stream.wait();
+ } catch (InterruptedException e) {
+ throw new IOException(sm.getString(
+
"upgradeHandler.windowSizeReservationInterrupted", connectionId,
+ stream.getIdentifier(),
Integer.toString(reservation)), e);
+ }
+ } else {
+ return 0;
}
}
} while (allocation == 0);
Modified: tomcat/trunk/java/org/apache/coyote/http2/Stream.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Stream.java?rev=1838275&r1=1838274&r2=1838275&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/Stream.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http2/Stream.java Fri Aug 17 19:16:56
2018
@@ -718,6 +718,7 @@ class Stream extends AbstractStream impl
private final ByteBuffer buffer = ByteBuffer.allocate(8 * 1024);
private final WriteBuffer writeBuffer = new WriteBuffer(32 * 1024);
private volatile long written = 0;
+ private volatile int streamReservation = 0;
private volatile boolean closed = false;
private volatile boolean endOfStreamSent = false;
@@ -732,25 +733,31 @@ class Stream extends AbstractStream impl
throw new IllegalStateException(
sm.getString("stream.closed", getConnectionId(),
getIdentifier()));
}
- int chunkLimit = chunk.limit();
- int offset = 0;
- while (chunk.remaining() > 0) {
- int thisTime = Math.min(buffer.remaining(), chunk.remaining());
- chunk.limit(chunk.position() + thisTime);
- buffer.put(chunk);
- chunk.limit(chunkLimit);
- offset += thisTime;
- if (chunk.remaining() > 0 && !buffer.hasRemaining()) {
- // Only flush if we have more data to write and the buffer
- // is full
- if (flush(true, coyoteResponse.getWriteListener() ==
null)) {
- writeBuffer.add(chunk);
- break;
+ int totalThisTime = 0;
+ if (writeBuffer.isEmpty()) {
+ int chunkLimit = chunk.limit();
+ while (chunk.remaining() > 0) {
+ int thisTime = Math.min(buffer.remaining(),
chunk.remaining());
+ chunk.limit(chunk.position() + thisTime);
+ buffer.put(chunk);
+ chunk.limit(chunkLimit);
+ totalThisTime += thisTime;
+ if (chunk.remaining() > 0 && !buffer.hasRemaining()) {
+ // Only flush if we have more data to write and the
buffer
+ // is full
+ if (flush(true, coyoteResponse.getWriteListener() ==
null)) {
+ totalThisTime += chunk.remaining();
+ writeBuffer.add(chunk);
+ break;
+ }
}
}
+ } else {
+ totalThisTime = chunk.remaining();
+ writeBuffer.add(chunk);
}
- written += offset;
- return offset;
+ written += totalThisTime;
+ return totalThisTime;
}
final synchronized boolean flush(boolean block) throws IOException {
@@ -803,15 +810,22 @@ class Stream extends AbstractStream impl
buffer.flip();
int left = buffer.remaining();
while (left > 0) {
- int streamReservation = reserveWindowSize(left, block);
if (streamReservation == 0) {
- // Must be non-blocking
- buffer.compact();
- return true;
+ streamReservation = reserveWindowSize(left, block);
+ if (streamReservation == 0) {
+ // Must be non-blocking
+ buffer.compact();
+ return true;
+ }
}
while (streamReservation > 0) {
int connectionReservation =
- handler.reserveWindowSize(Stream.this,
streamReservation);
+ handler.reserveWindowSize(Stream.this,
streamReservation, block);
+ if (connectionReservation == 0) {
+ // Must be non-blocking
+ buffer.compact();
+ return true;
+ }
// Do the write
handler.writeBody(Stream.this, buffer,
connectionReservation,
!writeInProgress && closed && left ==
connectionReservation &&
@@ -858,16 +872,14 @@ class Stream extends AbstractStream impl
}
@Override
- public boolean writeFromBuffer(ByteBuffer src, boolean blocking)
throws IOException {
+ public synchronized boolean writeFromBuffer(ByteBuffer src, boolean
blocking) throws IOException {
int chunkLimit = src.limit();
- int offset = 0;
while (src.remaining() > 0) {
int thisTime = Math.min(buffer.remaining(), src.remaining());
src.limit(src.position() + thisTime);
buffer.put(src);
src.limit(chunkLimit);
- written += offset;
- if (flush(true, blocking)) {
+ if (flush(false, blocking)) {
return true;
}
}
Modified: tomcat/trunk/test/org/apache/coyote/http2/Http2TestBase.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/coyote/http2/Http2TestBase.java?rev=1838275&r1=1838274&r2=1838275&view=diff
==============================================================================
--- tomcat/trunk/test/org/apache/coyote/http2/Http2TestBase.java (original)
+++ tomcat/trunk/test/org/apache/coyote/http2/Http2TestBase.java Fri Aug 17
19:16:56 2018
@@ -890,6 +890,7 @@ public abstract class Http2TestBase exte
private ConnectionSettingsRemote remoteSettings = new
ConnectionSettingsRemote("-1");
private boolean traceBody = false;
private ByteBuffer bodyBuffer = null;
+ private long bytesRead;
public void setTraceBody(boolean traceBody) {
this.traceBody = traceBody;
@@ -905,6 +906,7 @@ public abstract class Http2TestBase exte
@Override
public ByteBuffer startRequestBodyFrame(int streamId, int payloadSize)
{
lastStreamId = Integer.toString(streamId);
+ bytesRead += payloadSize;
if (traceBody) {
bodyBuffer = ByteBuffer.allocate(payloadSize);
return bodyBuffer;
@@ -1068,6 +1070,7 @@ public abstract class Http2TestBase exte
public void clearTrace() {
trace = new StringBuffer();
+ bytesRead = 0;
}
@@ -1079,6 +1082,11 @@ public abstract class Http2TestBase exte
public int getMaxFrameSize() {
return remoteSettings.getMaxFrameSize();
}
+
+
+ public long getBytesRead() {
+ return bytesRead;
+ }
}
@@ -1101,6 +1109,8 @@ public abstract class Http2TestBase exte
private static final long serialVersionUID = 1L;
+ public static final int CONTENT_LENGTH = 8192;
+
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
Added: tomcat/trunk/test/org/apache/coyote/http2/TestAsync.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/coyote/http2/TestAsync.java?rev=1838275&view=auto
==============================================================================
--- tomcat/trunk/test/org/apache/coyote/http2/TestAsync.java (added)
+++ tomcat/trunk/test/org/apache/coyote/http2/TestAsync.java Fri Aug 17
19:16:56 2018
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.coyote.http2;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.WriteListener;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.catalina.Context;
+import org.apache.catalina.Wrapper;
+import org.apache.catalina.startup.Tomcat;
+
+public class TestAsync extends Http2TestBase {
+
+ private static final int BLOCK_SIZE = 0x8000;
+
+ // https://bz.apache.org/bugzilla/show_bug.cgi?id=62614
+ @Test
+ public void testEmptyBothWindowsUpdateConnectionFirst() throws Exception {
+ doEmptyWindowTest(true, false, false);
+ }
+
+
+ @Test
+ public void testEmptyBothWindowsUpdateStreamFirst() throws Exception {
+ doEmptyWindowTest(false, false, false);
+ }
+
+
+ @Test
+ public void testEmptyConnectionWindowUpdateConnectionFirst() throws
Exception {
+ doEmptyWindowTest(true, false, true);
+ }
+
+
+ @Test
+ public void testEmptyConnectionWindowUpdateStreamFirst() throws Exception {
+ doEmptyWindowTest(false, false, true);
+ }
+
+
+ @Test
+ public void testEmptyStreamWindowUpdateConnectionFirst() throws Exception {
+ doEmptyWindowTest(true, true, false);
+ }
+
+
+ @Test
+ public void testEmptyStreamWindowUpdateStreamFirst() throws Exception {
+ doEmptyWindowTest(false, true, false);
+ }
+
+
+ // No point testing when both Stream and Connection are unlimited
+
+
+ private void doEmptyWindowTest(boolean expandConnectionFirst, boolean
connectionUnlimited,
+ boolean streamUnlimited) throws Exception {
+ int blockCount = 4;
+
+ enableHttp2();
+
+ Tomcat tomcat = getTomcatInstance();
+
+ Context ctxt = tomcat.addContext("", null);
+ Tomcat.addServlet(ctxt, "simple", new SimpleServlet());
+ ctxt.addServletMappingDecoded("/simple", "simple");
+ Wrapper w = Tomcat.addServlet(ctxt, "async", new
AsyncServlet(blockCount));
+ w.setAsyncSupported(true);
+ ctxt.addServletMappingDecoded("/async", "async");
+ tomcat.start();
+
+ openClientConnection();
+ doHttpUpgrade();
+ sendClientPreface();
+ validateHttp2InitialResponse();
+
+ byte[] frameHeader = new byte[9];
+ ByteBuffer headersPayload = ByteBuffer.allocate(128);
+ buildGetRequest(frameHeader, headersPayload, null, 3, "/async");
+ writeFrame(frameHeader, headersPayload);
+
+ // Reset connection window size after intial response
+ sendWindowUpdate(0, SimpleServlet.CONTENT_LENGTH);
+
+ if (connectionUnlimited) {
+ // Effectively unlimited for this test
+ sendWindowUpdate(0, blockCount * BLOCK_SIZE * 2);
+ }
+ if (streamUnlimited) {
+ // Effectively unlimited for this test
+ sendWindowUpdate(3, blockCount * BLOCK_SIZE * 2);
+ }
+
+ // Headers
+ parser.readFrame(true);
+ // Body
+ int startingWindowSize =
ConnectionSettingsBase.DEFAULT_INITIAL_WINDOW_SIZE;
+
+ while (output.getBytesRead() < startingWindowSize) {
+ parser.readFrame(true);
+ }
+
+ // Check that the right number of bytes were received
+ Assert.assertEquals(startingWindowSize, output.getBytesRead());
+
+ // Increase the Window size (50% of total body)
+ int windowSizeIncrease = blockCount * BLOCK_SIZE / 2;
+ if (expandConnectionFirst) {
+ sendWindowUpdate(0, windowSizeIncrease);
+ sendWindowUpdate(3, windowSizeIncrease);
+ } else {
+ sendWindowUpdate(3, windowSizeIncrease);
+ sendWindowUpdate(0, windowSizeIncrease);
+ }
+
+ while (output.getBytesRead() < startingWindowSize +
windowSizeIncrease) {
+ parser.readFrame(true);
+ }
+
+ // Check that the right number of bytes were received
+ Assert.assertEquals(startingWindowSize + windowSizeIncrease,
output.getBytesRead());
+
+ // Increase the Window size
+ if (expandConnectionFirst) {
+ sendWindowUpdate(0, windowSizeIncrease);
+ sendWindowUpdate(3, windowSizeIncrease);
+ } else {
+ sendWindowUpdate(3, windowSizeIncrease);
+ sendWindowUpdate(0, windowSizeIncrease);
+ }
+
+ while (!output.getTrace().endsWith("3-EndOfStream\n")) {
+ parser.readFrame(true);
+ }
+
+ // Check that the right number of bytes were received
+ Assert.assertEquals(blockCount * BLOCK_SIZE, output.getBytesRead());
+ }
+
+
+ public static class AsyncServlet extends HttpServlet {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int blockLimit;
+
+ public AsyncServlet(int blockLimit) {
+ this.blockLimit = blockLimit;
+ }
+
+ @Override
+ protected void doGet(HttpServletRequest request, HttpServletResponse
response)
+ throws IOException {
+
+ final AsyncContext asyncContext = request.startAsync();
+
+ response.setStatus(HttpServletResponse.SC_OK);
+ response.setContentType("application/binary");
+
+ final ServletOutputStream output = response.getOutputStream();
+ output.setWriteListener(new WriteListener() {
+
+ int blockCount;
+ byte[] bytes = new byte[BLOCK_SIZE];
+
+
+ @Override
+ public void onWritePossible() throws IOException {
+ while (output.isReady()) {
+ blockCount++;
+ output.write(bytes);
+ if (blockCount > blockLimit) {
+ asyncContext.complete();
+ return;
+ }
+ }
+ }
+
+
+ @Override
+ public void onError(Throwable t) {
+ t.printStackTrace();
+ }
+ });
+ }
+ }
+}
Propchange: tomcat/trunk/test/org/apache/coyote/http2/TestAsync.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: tomcat/trunk/webapps/docs/changelog.xml
URL:
http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/changelog.xml?rev=1838275&r1=1838274&r2=1838275&view=diff
==============================================================================
--- tomcat/trunk/webapps/docs/changelog.xml (original)
+++ tomcat/trunk/webapps/docs/changelog.xml Fri Aug 17 19:16:56 2018
@@ -55,6 +55,10 @@
<bug>62620</bug>: Fix corruption of response bodies when writing large
bodies using asynchronous processing over HTTP/2. (markt)
</fix>
+ <fix>
+ Additional fixes for output corruption of response bodies when writing
+ large bodies using asynchronous processing over HTTP/2. (markt)
+ </fix>
</changelog>
</subsection>
<subsection name="Other">
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]