Author: markt
Date: Wed Aug 15 15:06:56 2018
New Revision: 1838104
URL: http://svn.apache.org/viewvc?rev=1838104&view=rev
Log:
Fix https://bz.apache.org/bugzilla/show_bug.cgi?id=62620
Fix corruption of response bodies when writing large bodies using asynchronous
processing over HTTP/2.
Added:
tomcat/trunk/java/org/apache/tomcat/util/net/WriteBuffer.java (with props)
Modified:
tomcat/trunk/java/org/apache/coyote/http2/Stream.java
tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java
tomcat/trunk/webapps/docs/changelog.xml
Modified: tomcat/trunk/java/org/apache/coyote/http2/Stream.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Stream.java?rev=1838104&r1=1838103&r2=1838104&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/Stream.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http2/Stream.java Wed Aug 15 15:06:56
2018
@@ -43,6 +43,7 @@ import org.apache.tomcat.util.buf.Messag
import org.apache.tomcat.util.http.MimeHeaders;
import org.apache.tomcat.util.http.parser.Host;
import org.apache.tomcat.util.net.ApplicationBufferHandler;
+import org.apache.tomcat.util.net.WriteBuffer;
import org.apache.tomcat.util.res.StringManager;
class Stream extends AbstractStream implements HeaderEmitter {
@@ -712,9 +713,10 @@ class Stream extends AbstractStream impl
}
- class StreamOutputBuffer implements HttpOutputBuffer {
+ class StreamOutputBuffer implements HttpOutputBuffer, WriteBuffer.Sink {
private final ByteBuffer buffer = ByteBuffer.allocate(8 * 1024);
+ private final WriteBuffer writeBuffer = new WriteBuffer(32 * 1024);
private volatile long written = 0;
private volatile boolean closed = false;
private volatile boolean endOfStreamSent = false;
@@ -742,6 +744,7 @@ class Stream extends AbstractStream impl
// Only flush if we have more data to write and the buffer
// is full
if (flush(true, coyoteResponse.getWriteListener() ==
null)) {
+ writeBuffer.add(chunk);
break;
}
}
@@ -751,7 +754,33 @@ class Stream extends AbstractStream impl
}
final synchronized boolean flush(boolean block) throws IOException {
- return flush(false, block);
+ /*
+ * Need to ensure that there is exactly one call to flush even when
+ * there is no data to write.
+ * Too few calls (i.e. zero) and the end of stream message is not
+ * sent for a completed asynchronous write.
+ * Too many calls and the end of stream message is sent too soon
and
+ * trailer headers are not sent.
+ */
+ boolean dataLeft = buffer.position() > 0;
+ boolean flushed = false;
+
+ if (dataLeft) {
+ dataLeft = flush(false, block);
+ flushed = true;
+ }
+
+ if (!dataLeft) {
+ if (writeBuffer.isEmpty()) {
+ if (!flushed) {
+ dataLeft = flush(false, block);
+ }
+ } else {
+ dataLeft = writeBuffer.write(this, block);
+ }
+ }
+
+ return dataLeft;
}
private final synchronized boolean flush(boolean writeInProgress,
boolean block)
@@ -827,6 +856,23 @@ class Stream extends AbstractStream impl
public void flush() throws IOException {
flush(true);
}
+
+ @Override
+ public boolean writeFromBuffer(ByteBuffer src, boolean blocking)
throws IOException {
+ int chunkLimit = src.limit();
+ int offset = 0;
+ while (src.remaining() > 0) {
+ int thisTime = Math.min(buffer.remaining(), src.remaining());
+ src.limit(src.position() + thisTime);
+ buffer.put(src);
+ src.limit(chunkLimit);
+ written += offset;
+ if (flush(true, blocking)) {
+ return true;
+ }
+ }
+ return false;
+ }
}
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1838104&r1=1838103&r2=1838104&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Wed Aug 15
15:06:56 2018
@@ -48,7 +48,6 @@ import javax.net.ssl.SSLSession;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.apache.tomcat.util.ExceptionUtils;
-import org.apache.tomcat.util.buf.ByteBufferHolder;
import org.apache.tomcat.util.collections.SynchronizedStack;
import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
import org.apache.tomcat.util.net.jsse.JSSESupport;
@@ -575,19 +574,14 @@ public class Nio2Endpoint extends Abstra
synchronized (writeCompletionHandler) {
if (nBytes.intValue() < 0) {
failed(new
EOFException(sm.getString("iob.failedwrite")), attachment);
- } else if (bufferedWrites.size() > 0) {
+ } else if (!writeBuffer.isEmpty()) {
nestedWriteCompletionCount.get().incrementAndGet();
// Continue writing data using a gathering write
List<ByteBuffer> arrayList = new ArrayList<>();
if (attachment.hasRemaining()) {
arrayList.add(attachment);
}
- for (ByteBufferHolder buffer : bufferedWrites) {
- buffer.flip();
- arrayList.add(buffer.getBuf());
- }
- bufferedWrites.clear();
- ByteBuffer[] array = arrayList.toArray(new
ByteBuffer[arrayList.size()]);
+ ByteBuffer[] array =
writeBuffer.transferToListAsArray(arrayList);
getSocket().write(array, 0, array.length,
toNio2Timeout(getWriteTimeout()),
TimeUnit.MILLISECONDS,
array, gatheringWriteCompletionHandler);
@@ -633,7 +627,7 @@ public class Nio2Endpoint extends Abstra
synchronized (writeCompletionHandler) {
if (nBytes.longValue() < 0) {
failed(new
EOFException(sm.getString("iob.failedwrite")), attachment);
- } else if (bufferedWrites.size() > 0 ||
arrayHasData(attachment)) {
+ } else if (!writeBuffer.isEmpty() ||
arrayHasData(attachment)) {
// Continue writing data
nestedWriteCompletionCount.get().incrementAndGet();
List<ByteBuffer> arrayList = new ArrayList<>();
@@ -642,12 +636,7 @@ public class Nio2Endpoint extends Abstra
arrayList.add(buffer);
}
}
- for (ByteBufferHolder buffer : bufferedWrites) {
- buffer.flip();
- arrayList.add(buffer.getBuf());
- }
- bufferedWrites.clear();
- ByteBuffer[] array = arrayList.toArray(new
ByteBuffer[arrayList.size()]);
+ ByteBuffer[] array =
writeBuffer.transferToListAsArray(arrayList);
getSocket().write(array, 0, array.length,
toNio2Timeout(getWriteTimeout()),
TimeUnit.MILLISECONDS,
array, gatheringWriteCompletionHandler);
@@ -1187,11 +1176,11 @@ public class Nio2Endpoint extends Abstra
off = off + thisTime;
if (len > 0) {
// Remaining data must be buffered
- addToBuffers(buf, off, len);
+ writeBuffer.add(buf, off, len);
}
flushNonBlocking(true);
} else {
- addToBuffers(buf, off, len);
+ writeBuffer.add(buf, off, len);
}
}
}
@@ -1222,11 +1211,11 @@ public class Nio2Endpoint extends Abstra
transfer(from, socketBufferHandler.getWriteBuffer());
if (from.remaining() > 0) {
// Remaining data must be buffered
- addToBuffers(from);
+ writeBuffer.add(from);
}
flushNonBlocking(true);
} else {
- addToBuffers(from);
+ writeBuffer.add(from);
}
}
}
@@ -1297,18 +1286,13 @@ public class Nio2Endpoint extends Abstra
synchronized (writeCompletionHandler) {
if (hasPermit || writePending.tryAcquire()) {
socketBufferHandler.configureWriteBufferForRead();
- if (bufferedWrites.size() > 0) {
+ if (!writeBuffer.isEmpty()) {
// Gathering write of the main buffer plus all
leftovers
List<ByteBuffer> arrayList = new ArrayList<>();
if
(socketBufferHandler.getWriteBuffer().hasRemaining()) {
arrayList.add(socketBufferHandler.getWriteBuffer());
}
- for (ByteBufferHolder buffer : bufferedWrites) {
- buffer.flip();
- arrayList.add(buffer.getBuf());
- }
- bufferedWrites.clear();
- ByteBuffer[] array = arrayList.toArray(new
ByteBuffer[arrayList.size()]);
+ ByteBuffer[] array =
writeBuffer.transferToListAsArray(arrayList);
Nio2Endpoint.startInline();
getSocket().write(array, 0, array.length,
toNio2Timeout(getWriteTimeout()),
TimeUnit.MILLISECONDS, array,
gatheringWriteCompletionHandler);
@@ -1336,7 +1320,7 @@ public class Nio2Endpoint extends Abstra
public boolean hasDataToWrite() {
synchronized (writeCompletionHandler) {
return !socketBufferHandler.isWriteBufferEmpty() ||
- bufferedWrites.size() > 0 || getError() != null;
+ !writeBuffer.isEmpty() || getError() != null;
}
}
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java?rev=1838104&r1=1838103&r2=1838104&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java
(original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java Wed Aug
15 15:06:56 2018
@@ -19,9 +19,7 @@ package org.apache.tomcat.util.net;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
-import java.util.Iterator;
import java.util.concurrent.Executor;
-import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
@@ -30,7 +28,6 @@ import java.util.concurrent.locks.Reentr
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
-import org.apache.tomcat.util.buf.ByteBufferHolder;
import org.apache.tomcat.util.res.StringManager;
public abstract class SocketWrapperBase<E> {
@@ -80,17 +77,17 @@ public abstract class SocketWrapperBase<
protected volatile SocketBufferHandler socketBufferHandler = null;
/**
+ * The max size of the individual buffered write buffers
+ */
+ protected int bufferedWriteSize = 64 * 1024; // 64k default write buffer
+
+ /**
* For "non-blocking" writes use an external set of buffers. Although the
* API only allows one non-blocking write at a time, due to buffering and
* the possible need to write HTTP headers, there may be more than one
write
* to the OutputBuffer.
*/
- protected final LinkedBlockingDeque<ByteBufferHolder> bufferedWrites = new
LinkedBlockingDeque<>();
-
- /**
- * The max size of the buffered write buffer
- */
- protected int bufferedWriteSize = 64 * 1024; // 64k default write buffer
+ protected final WriteBuffer writeBuffer = new
WriteBuffer(bufferedWriteSize);
public SocketWrapperBase(E socket, AbstractEndpoint<E,?> endpoint) {
this.socket = socket;
@@ -252,7 +249,7 @@ public abstract class SocketWrapperBase<
public SocketBufferHandler getSocketBufferHandler() { return
socketBufferHandler; }
public boolean hasDataToWrite() {
- return !socketBufferHandler.isWriteBufferEmpty() ||
bufferedWrites.size() > 0;
+ return !socketBufferHandler.isWriteBufferEmpty() ||
!writeBuffer.isEmpty();
}
/**
@@ -282,7 +279,7 @@ public abstract class SocketWrapperBase<
if (socketBufferHandler == null) {
throw new IllegalStateException(sm.getString("socket.closed"));
}
- return socketBufferHandler.isWriteBufferWritable() &&
bufferedWrites.size() == 0;
+ return socketBufferHandler.isWriteBufferWritable() &&
writeBuffer.isEmpty();
}
@@ -505,7 +502,7 @@ public abstract class SocketWrapperBase<
* @throws IOException If an IO error occurs during the write
*/
protected void writeNonBlocking(byte[] buf, int off, int len) throws
IOException {
- if (bufferedWrites.size() == 0 &&
socketBufferHandler.isWriteBufferWritable()) {
+ if (writeBuffer.isEmpty() &&
socketBufferHandler.isWriteBufferWritable()) {
socketBufferHandler.configureWriteBufferForWrite();
int thisTime = transfer(buf, off, len,
socketBufferHandler.getWriteBuffer());
len = len - thisTime;
@@ -527,7 +524,7 @@ public abstract class SocketWrapperBase<
if (len > 0) {
// Remaining data must be buffered
- addToBuffers(buf, off, len);
+ writeBuffer.add(buf, off, len);
}
}
@@ -544,18 +541,18 @@ public abstract class SocketWrapperBase<
* @throws IOException If an IO error occurs during the write
*/
protected void writeNonBlocking(ByteBuffer from) throws IOException {
- if (bufferedWrites.size() == 0 &&
socketBufferHandler.isWriteBufferWritable()) {
+ if (writeBuffer.isEmpty() &&
socketBufferHandler.isWriteBufferWritable()) {
writeNonBlockingInternal(from);
}
if (from.remaining() > 0) {
// Remaining data must be buffered
- addToBuffers(from);
+ writeBuffer.add(from);
}
}
- private boolean writeNonBlockingInternal(ByteBuffer from) throws
IOException {
+ boolean writeNonBlockingInternal(ByteBuffer from) throws IOException {
if (socketBufferHandler.isWriteBufferEmpty()) {
return writeByteBufferNonBlocking(from);
} else {
@@ -627,16 +624,8 @@ public abstract class SocketWrapperBase<
protected void flushBlocking() throws IOException {
doWrite(true);
- if (bufferedWrites.size() > 0) {
- Iterator<ByteBufferHolder> bufIter = bufferedWrites.iterator();
- while (bufIter.hasNext()) {
- ByteBufferHolder buffer = bufIter.next();
- buffer.flip();
- writeBlocking(buffer.getBuf());
- if (buffer.getBuf().remaining() == 0) {
- bufIter.remove();
- }
- }
+ if (!writeBuffer.isEmpty()) {
+ writeBuffer.write(this, true);
if (!socketBufferHandler.isWriteBufferEmpty()) {
doWrite(true);
@@ -655,16 +644,8 @@ public abstract class SocketWrapperBase<
dataLeft = !socketBufferHandler.isWriteBufferEmpty();
}
- if (!dataLeft && bufferedWrites.size() > 0) {
- Iterator<ByteBufferHolder> bufIter = bufferedWrites.iterator();
- while (!dataLeft && bufIter.hasNext()) {
- ByteBufferHolder buffer = bufIter.next();
- buffer.flip();
- dataLeft = writeNonBlockingInternal(buffer.getBuf());
- if (buffer.getBuf().remaining() == 0) {
- bufIter.remove();
- }
- }
+ if (!dataLeft && !writeBuffer.isEmpty()) {
+ dataLeft = writeBuffer.write(this, false);
if (!dataLeft && !socketBufferHandler.isWriteBufferEmpty()) {
doWrite(false);
@@ -706,29 +687,6 @@ public abstract class SocketWrapperBase<
protected abstract void doWrite(boolean block, ByteBuffer from) throws
IOException;
- protected void addToBuffers(byte[] buf, int offset, int length) {
- ByteBufferHolder holder = getByteBufferHolder(length);
- holder.getBuf().put(buf, offset, length);
- }
-
-
- protected void addToBuffers(ByteBuffer from) {
- ByteBufferHolder holder = getByteBufferHolder(from.remaining());
- holder.getBuf().put(from);
- }
-
-
- private ByteBufferHolder getByteBufferHolder(int capacity) {
- ByteBufferHolder holder = bufferedWrites.peekLast();
- if (holder == null || holder.isFlipped() ||
holder.getBuf().remaining() < capacity) {
- ByteBuffer buffer =
ByteBuffer.allocate(Math.max(bufferedWriteSize, capacity));
- holder = new ByteBufferHolder(buffer, false);
- bufferedWrites.add(holder);
- }
- return holder;
- }
-
-
public void processSocket(SocketEvent socketStatus, boolean dispatch) {
endpoint.processSocket(this, socketStatus, dispatch);
}
Added: tomcat/trunk/java/org/apache/tomcat/util/net/WriteBuffer.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/WriteBuffer.java?rev=1838104&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/WriteBuffer.java (added)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/WriteBuffer.java Wed Aug 15
15:06:56 2018
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.util.net;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import org.apache.tomcat.util.buf.ByteBufferHolder;
+
+/**
+ * Provides an expandable set of buffers for writes. Non-blocking writes can be
+ * of any size and may not be able to be written immediately or wholly
contained
+ * in the buffer used to perform the writes to the next layer. This class
+ * provides a buffering capability to allow such writes to return immediately
+ * and also allows for the user provided buffers to be re-used / recycled as
+ * required.
+ */
+public class WriteBuffer {
+
+ private final int bufferSize;
+
+ private final LinkedBlockingDeque<ByteBufferHolder> buffers = new
LinkedBlockingDeque<>();
+
+ public WriteBuffer(int bufferSize) {
+ this.bufferSize = bufferSize;
+ }
+
+
+ void add(byte[] buf, int offset, int length) {
+ ByteBufferHolder holder = getByteBufferHolder(length);
+ holder.getBuf().put(buf, offset, length);
+ }
+
+
+ public void add(ByteBuffer from) {
+ ByteBufferHolder holder = getByteBufferHolder(from.remaining());
+ holder.getBuf().put(from);
+ }
+
+
+ private ByteBufferHolder getByteBufferHolder(int capacity) {
+ ByteBufferHolder holder = buffers.peekLast();
+ if (holder == null || holder.isFlipped() ||
holder.getBuf().remaining() < capacity) {
+ ByteBuffer buffer = ByteBuffer.allocate(Math.max(bufferSize,
capacity));
+ holder = new ByteBufferHolder(buffer, false);
+ buffers.add(holder);
+ }
+ return holder;
+ }
+
+
+ public boolean isEmpty() {
+ return buffers.isEmpty();
+ }
+
+
+ ByteBuffer[] transferToListAsArray(List<ByteBuffer> target) {
+ for (ByteBufferHolder buffer : buffers) {
+ buffer.flip();
+ target.add(buffer.getBuf());
+ }
+ buffers.clear();
+ return target.toArray(new ByteBuffer[target.size()]);
+ }
+
+
+ boolean write(SocketWrapperBase<?> socketWrapper, boolean blocking) throws
IOException {
+ Iterator<ByteBufferHolder> bufIter = buffers.iterator();
+ boolean dataLeft = false;
+ while (!dataLeft && bufIter.hasNext()) {
+ ByteBufferHolder buffer = bufIter.next();
+ buffer.flip();
+ if (blocking) {
+ socketWrapper.writeBlocking(buffer.getBuf());
+ } else {
+ dataLeft =
socketWrapper.writeNonBlockingInternal(buffer.getBuf());
+ }
+ if (buffer.getBuf().remaining() == 0) {
+ bufIter.remove();
+ }
+ }
+ return dataLeft;
+ }
+
+
+ public boolean write(Sink sink, boolean blocking) throws IOException {
+ Iterator<ByteBufferHolder> bufIter = buffers.iterator();
+ boolean dataLeft = false;
+ while (!dataLeft && bufIter.hasNext()) {
+ ByteBufferHolder buffer = bufIter.next();
+ buffer.flip();
+ dataLeft = sink.writeFromBuffer(buffer.getBuf(), blocking);
+ if (!dataLeft) {
+ bufIter.remove();
+ }
+ }
+ return dataLeft;
+ }
+
+
+ /**
+ * Interface implemented by clients of the WriteBuffer to enable data to be
+ * written back out from the buffer.
+ */
+ public interface Sink {
+ boolean writeFromBuffer(ByteBuffer buffer, boolean block) throws
IOException;
+ }
+}
Propchange: tomcat/trunk/java/org/apache/tomcat/util/net/WriteBuffer.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: tomcat/trunk/webapps/docs/changelog.xml
URL:
http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/changelog.xml?rev=1838104&r1=1838103&r2=1838104&view=diff
==============================================================================
--- tomcat/trunk/webapps/docs/changelog.xml (original)
+++ tomcat/trunk/webapps/docs/changelog.xml Wed Aug 15 15:06:56 2018
@@ -51,6 +51,10 @@
Fix potential deadlocks when using asynchronous Servlet processing with
HTTP/2 connectors. (markt)
</fix>
+ <fix>
+ <bug>62620</bug>: Fix corruption of response bodies when writing large
+ bodies using asynchronous processing over HTTP/2. (markt)
+ </fix>
</changelog>
</subsection>
<subsection name="Other">
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]