Author: markt
Date: Thu Jan 8 13:10:46 2015
New Revision: 1650278
URL: http://svn.apache.org/r1650278
Log:
Untested first pass at pushing down NIO2 writes.
Modified:
tomcat/trunk/java/org/apache/coyote/http11/InternalNio2OutputBuffer.java
tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java
Modified:
tomcat/trunk/java/org/apache/coyote/http11/InternalNio2OutputBuffer.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalNio2OutputBuffer.java?rev=1650278&r1=1650277&r2=1650278&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalNio2OutputBuffer.java
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalNio2OutputBuffer.java
Thu Jan 8 13:10:46 2015
@@ -17,20 +17,10 @@
package org.apache.coyote.http11;
-import java.io.EOFException;
import java.io.IOException;
-import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import org.apache.coyote.Response;
-import org.apache.tomcat.util.buf.ByteBufferHolder;
import org.apache.tomcat.util.net.Nio2Channel;
-import org.apache.tomcat.util.net.Nio2Endpoint;
-import org.apache.tomcat.util.net.Nio2Endpoint.Nio2SocketWrapper;
/**
* Output buffer implementation for NIO2.
@@ -50,178 +40,16 @@ public class InternalNio2OutputBuffer ex
// ------------------------------------------------------ Protected Methods
@Override
- protected void addToBB(byte[] buf, int offset, int length)
- throws IOException {
-
- if (length == 0)
- return;
- if (socketWrapper == null || socketWrapper.getSocket() == null)
- return;
-
- if (isBlocking()) {
- while (length > 0) {
- int thisTime = transfer(buf, offset, length,
socketWrapper.socketWriteBuffer);
- length = length - thisTime;
- offset = offset + thisTime;
- if (socketWrapper.socketWriteBuffer.remaining() == 0) {
- flushBuffer(true);
- }
- }
- } else {
- // FIXME: Possible new behavior:
- // If there's non blocking abuse (like a test writing 1MB in a
single
- // "non blocking" write), then block until the previous write is
- // done rather than continue buffering
- // Also allows doing autoblocking
- // Could be "smart" with coordination with the main
CoyoteOutputStream to
- // indicate the end of a write
- // Uses: if (writePending.tryAcquire(socketWrapper.getTimeout(),
TimeUnit.MILLISECONDS))
- if (((Nio2SocketWrapper)socketWrapper).writePending.tryAcquire()) {
- synchronized
(((Nio2SocketWrapper)socketWrapper).writeCompletionHandler) {
- // No pending completion handler, so writing to the main
buffer
- // is possible
- int thisTime = transfer(buf, offset, length,
socketWrapper.socketWriteBuffer);
- length = length - thisTime;
- offset = offset + thisTime;
- if (length > 0) {
- // Remaining data must be buffered
- addToBuffers(buf, offset, length);
- }
- flushBufferInternal(false, true);
- }
- } else {
- synchronized
(((Nio2SocketWrapper)socketWrapper).writeCompletionHandler) {
- addToBuffers(buf, offset, length);
- }
- }
- }
- }
-
-
- private void addToBuffers(byte[] buf, int offset, int length) {
- ByteBuffer buffer = ByteBuffer.allocate(length);
- buffer.put(buf, offset, length);
- socketWrapper.bufferedWrites.add(new ByteBufferHolder(buffer, false));
+ protected void addToBB(byte[] buf, int offset, int length) throws
IOException {
+ socketWrapper.write(isBlocking(), buf, offset, length);
}
- /**
- * Callback to write data from the buffer.
- */
@Override
protected boolean flushBuffer(boolean block) throws IOException {
- if (socketWrapper.getError() != null) {
- throw socketWrapper.getError();
- }
- return flushBufferInternal(block, false);
+ return socketWrapper.flush(block);
}
- private boolean flushBufferInternal(boolean block, boolean hasPermit)
throws IOException {
- if (socketWrapper == null || socketWrapper.getSocket() == null)
- return false;
-
- if (block) {
- if (!isBlocking()) {
- // The final flush is blocking, but the processing was using
- // non blocking so wait until an async write is done
- try {
- if
(((Nio2SocketWrapper)socketWrapper).writePending.tryAcquire(socketWrapper.getTimeout(),
TimeUnit.MILLISECONDS)) {
-
((Nio2SocketWrapper)socketWrapper).writePending.release();
- }
- } catch (InterruptedException e) {
- // Ignore timeout
- }
- }
- try {
- if (socketWrapper.bufferedWrites.size() > 0) {
- for (ByteBufferHolder holder :
socketWrapper.bufferedWrites) {
- holder.flip();
- ByteBuffer buffer = holder.getBuf();
- while (buffer.hasRemaining()) {
- if
(socketWrapper.getSocket().write(buffer).get(socketWrapper.getTimeout(),
TimeUnit.MILLISECONDS).intValue() < 0) {
- throw new
EOFException(sm.getString("iob.failedwrite"));
- }
- }
- }
- socketWrapper.bufferedWrites.clear();
- }
- if (!socketWrapper.writeBufferFlipped) {
- socketWrapper.socketWriteBuffer.flip();
- socketWrapper.writeBufferFlipped = true;
- }
- while (socketWrapper.socketWriteBuffer.hasRemaining()) {
- if
(socketWrapper.getSocket().write(socketWrapper.socketWriteBuffer).get(socketWrapper.getTimeout(),
TimeUnit.MILLISECONDS).intValue() < 0) {
- throw new
EOFException(sm.getString("iob.failedwrite"));
- }
- }
- } catch (ExecutionException e) {
- if (e.getCause() instanceof IOException) {
- throw (IOException) e.getCause();
- } else {
- throw new IOException(e);
- }
- } catch (InterruptedException e) {
- throw new IOException(e);
- } catch (TimeoutException e) {
- throw new SocketTimeoutException();
- }
- socketWrapper.socketWriteBuffer.clear();
- socketWrapper.writeBufferFlipped = false;
- return false;
- } else {
- synchronized
(((Nio2SocketWrapper)socketWrapper).writeCompletionHandler) {
- if (hasPermit ||
((Nio2SocketWrapper)socketWrapper).writePending.tryAcquire()) {
- if (!socketWrapper.writeBufferFlipped) {
- socketWrapper.socketWriteBuffer.flip();
- socketWrapper.writeBufferFlipped = true;
- }
- Nio2Endpoint.startInline();
- if (socketWrapper.bufferedWrites.size() > 0) {
- // Gathering write of the main buffer plus all
leftovers
- ArrayList<ByteBuffer> arrayList = new ArrayList<>();
- if (socketWrapper.socketWriteBuffer.hasRemaining()) {
- arrayList.add(socketWrapper.socketWriteBuffer);
- }
- for (ByteBufferHolder buffer :
socketWrapper.bufferedWrites) {
- buffer.flip();
- arrayList.add(buffer.getBuf());
- }
- socketWrapper.bufferedWrites.clear();
- ByteBuffer[] array = arrayList.toArray(new
ByteBuffer[arrayList.size()]);
- socketWrapper.getSocket().write(array, 0,
array.length, socketWrapper.getTimeout(),
- TimeUnit.MILLISECONDS, array,
((Nio2SocketWrapper)socketWrapper).gatheringWriteCompletionHandler);
- } else if (socketWrapper.socketWriteBuffer.hasRemaining())
{
- // Regular write
-
socketWrapper.getSocket().write(socketWrapper.socketWriteBuffer,
socketWrapper.getTimeout(),
- TimeUnit.MILLISECONDS,
socketWrapper.socketWriteBuffer,
((Nio2SocketWrapper)socketWrapper).writeCompletionHandler);
- } else {
- // Nothing was written
-
((Nio2SocketWrapper)socketWrapper).writePending.release();
- }
- Nio2Endpoint.endInline();
- if
(((Nio2SocketWrapper)socketWrapper).writePending.availablePermits() > 0) {
- if (socketWrapper.socketWriteBuffer.remaining() == 0) {
- socketWrapper.socketWriteBuffer.clear();
- socketWrapper.writeBufferFlipped = false;
- }
- }
- }
- return socketWrapper.hasMoreDataToFlush() || hasBufferedData()
|| socketWrapper.getError() != null;
- }
- }
- }
-
-
- @Override
- public boolean hasDataToWrite() {
- synchronized
(((Nio2SocketWrapper)socketWrapper).writeCompletionHandler) {
- return socketWrapper.hasMoreDataToFlush() || hasBufferedData() ||
socketWrapper.getError() != null;
- }
- }
-
- protected boolean hasBufferedData() {
- return socketWrapper.bufferedWrites.size() > 0;
- }
@Override
protected void registerWriteInterest() {
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1650278&r1=1650277&r2=1650278&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Thu Jan 8
13:10:46 2015
@@ -722,16 +722,13 @@ public class Nio2Endpoint extends Abstra
private volatile boolean readPending = false;
private volatile boolean interest = true;
- private final int maxWrite;
- // TODO These are public for now to aid refactoring
- public final CompletionHandler<Integer, ByteBuffer>
writeCompletionHandler;
- public final CompletionHandler<Long, ByteBuffer[]>
gatheringWriteCompletionHandler;
- public final Semaphore writePending = new Semaphore(1);
+ private final CompletionHandler<Integer, ByteBuffer>
writeCompletionHandler;
+ private final CompletionHandler<Long, ByteBuffer[]>
gatheringWriteCompletionHandler;
+ private final Semaphore writePending = new Semaphore(1);
public Nio2SocketWrapper(Nio2Channel channel, Nio2Endpoint endpoint) {
super(channel, endpoint);
- maxWrite = channel.getBufHandler().getWriteBuffer().capacity();
this.readCompletionHandler = new CompletionHandler<Integer,
SocketWrapperBase<Nio2Channel>>() {
@Override
@@ -1091,57 +1088,105 @@ public class Nio2Endpoint extends Abstra
@Override
- public void write(boolean block, byte[] b, int off, int len) throws
IOException {
- int leftToWrite = len;
- int offset = off;
-
- while (leftToWrite > 0) {
- int writeThisLoop;
- int writtenThisLoop;
+ public void write(boolean block, byte[] buf, int off, int len) throws
IOException {
+ if (len == 0)
+ return;
+ if (getSocket() == null)
+ return;
- if (leftToWrite > maxWrite) {
- writeThisLoop = maxWrite;
- } else {
- writeThisLoop = leftToWrite;
- }
-
- writtenThisLoop = writeInternal(block, b, offset,
writeThisLoop);
- if (writtenThisLoop < 0) {
- throw new EOFException();
- }
- if (!block && writePending.availablePermits() == 0) {
- // Prevent concurrent writes in non blocking mode,
- // leftover data has to be buffered
- return;
+ if (block) {
+ while (len > 0) {
+ int thisTime = transfer(buf, off, len, socketWriteBuffer);
+ len = len - thisTime;
+ off = off + thisTime;
+ if (socketWriteBuffer.remaining() == 0) {
+ flush(true);
+ }
}
- offset += writtenThisLoop;
- leftToWrite -= writtenThisLoop;
-
- if (writtenThisLoop < writeThisLoop) {
- break;
+ } else {
+ // FIXME: Possible new behavior:
+ // If there's non blocking abuse (like a test writing 1MB in a
single
+ // "non blocking" write), then block until the previous write
is
+ // done rather than continue buffering
+ // Also allows doing autoblocking
+ // Could be "smart" with coordination with the main
CoyoteOutputStream to
+ // indicate the end of a write
+ // Uses: if
(writePending.tryAcquire(socketWrapper.getTimeout(), TimeUnit.MILLISECONDS))
+ if (writePending.tryAcquire()) {
+ synchronized (writeCompletionHandler) {
+ // No pending completion handler, so writing to the
main buffer
+ // is possible
+ int thisTime = transfer(buf, off, len,
socketWriteBuffer);
+ len = len - thisTime;
+ off = off + thisTime;
+ if (len > 0) {
+ // Remaining data must be buffered
+ addToBuffers(buf, off, len);
+ }
+ flush(false, true);
+ }
+ } else {
+ synchronized (writeCompletionHandler) {
+ addToBuffers(buf, off, len);
+ }
}
}
}
@Override
- protected int doWrite(ByteBuffer buffer, boolean block, boolean flip)
- throws IOException {
- // TODO Auto-generated method stub
+ protected int doWrite(ByteBuffer buffer, boolean block, boolean flip)
throws IOException {
+ // NO-OP for NIO2 since write() is over-ridden above.
return 0;
}
- private int writeInternal(boolean block, byte[] b, int off, int len)
- throws IOException {
- ByteBuffer writeBuffer =
getSocket().getBufHandler().getWriteBuffer();
- int written = 0;
+
+ @Override
+ public boolean flush(boolean block) throws IOException {
+ if (getError() != null) {
+ throw getError();
+ }
+ return super.flush(block);
+ }
+
+
+ @Override
+ protected boolean flush(boolean block, boolean hasPermit) throws
IOException {
+ if (getSocket() == null)
+ return false;
+
if (block) {
- writeBuffer.clear();
- writeBuffer.put(b, off, len);
- writeBuffer.flip();
- writeBufferFlipped = true;
try {
- written = getSocket().write(writeBuffer).get(getTimeout(),
TimeUnit.MILLISECONDS).intValue();
+ if (writePending.tryAcquire(getTimeout(),
TimeUnit.MILLISECONDS)) {
+ writePending.release();
+ } else {
+ // TODO
+ }
+ } catch (InterruptedException e) {
+ // Ignore timeout
+ }
+ try {
+ if (bufferedWrites.size() > 0) {
+ for (ByteBufferHolder holder : bufferedWrites) {
+ holder.flip();
+ ByteBuffer buffer = holder.getBuf();
+ while (buffer.hasRemaining()) {
+ if
(getSocket().write(buffer).get(getTimeout(), TimeUnit.MILLISECONDS).intValue()
< 0) {
+ throw new
EOFException(sm.getString("iob.failedwrite"));
+ }
+ }
+ }
+ bufferedWrites.clear();
+ }
+ if (!writeBufferFlipped) {
+ socketWriteBuffer.flip();
+ writeBufferFlipped = true;
+ }
+ while (socketWriteBuffer.hasRemaining()) {
+ if
(getSocket().write(socketWriteBuffer).get(getTimeout(),
TimeUnit.MILLISECONDS).intValue() < 0) {
+ throw new
EOFException(sm.getString("iob.failedwrite"));
+ }
+ }
} catch (ExecutionException e) {
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
@@ -1151,22 +1196,60 @@ public class Nio2Endpoint extends Abstra
} catch (InterruptedException e) {
throw new IOException(e);
} catch (TimeoutException e) {
- SocketTimeoutException ex = new SocketTimeoutException();
- throw ex;
+ throw new SocketTimeoutException();
}
+ socketWriteBuffer.clear();
+ writeBufferFlipped = false;
+ return false;
} else {
- if (writePending.tryAcquire()) {
- writeBuffer.clear();
- writeBuffer.put(b, off, len);
- writeBuffer.flip();
- writeBufferFlipped = true;
- Nio2Endpoint.startInline();
- getSocket().write(writeBuffer, getTimeout(),
TimeUnit.MILLISECONDS, writeBuffer, writeCompletionHandler);
- Nio2Endpoint.endInline();
- written = len;
+ synchronized (writeCompletionHandler) {
+ if (hasPermit || writePending.tryAcquire()) {
+ if (!writeBufferFlipped) {
+ socketWriteBuffer.flip();
+ writeBufferFlipped = true;
+ }
+ Nio2Endpoint.startInline();
+ if (bufferedWrites.size() > 0) {
+ // Gathering write of the main buffer plus all
leftovers
+ ArrayList<ByteBuffer> arrayList = new
ArrayList<>();
+ if (socketWriteBuffer.hasRemaining()) {
+ arrayList.add(socketWriteBuffer);
+ }
+ for (ByteBufferHolder buffer : bufferedWrites) {
+ buffer.flip();
+ arrayList.add(buffer.getBuf());
+ }
+ bufferedWrites.clear();
+ ByteBuffer[] array = arrayList.toArray(new
ByteBuffer[arrayList.size()]);
+ getSocket().write(array, 0, array.length,
getTimeout(),
+ TimeUnit.MILLISECONDS, array,
gatheringWriteCompletionHandler);
+ } else if (socketWriteBuffer.hasRemaining()) {
+ // Regular write
+ getSocket().write(socketWriteBuffer, getTimeout(),
+ TimeUnit.MILLISECONDS, socketWriteBuffer,
writeCompletionHandler);
+ } else {
+ // Nothing was written
+ writePending.release();
+ }
+ Nio2Endpoint.endInline();
+ if (writePending.availablePermits() > 0) {
+ if (socketWriteBuffer.remaining() == 0) {
+ socketWriteBuffer.clear();
+ writeBufferFlipped = false;
+ }
+ }
+ }
+ return hasMoreDataToFlush() || hasBufferedData() ||
getError() != null;
}
}
- return written;
+ }
+
+
+ @Override
+ public boolean hasDataToWrite() {
+ synchronized (writeCompletionHandler) {
+ return hasMoreDataToFlush() || hasBufferedData() || getError()
!= null;
+ }
}
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java?rev=1650278&r1=1650277&r2=1650278&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java
(original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java Thu Jan
8 13:10:46 2015
@@ -70,9 +70,8 @@ public abstract class SocketWrapperBase<
*/
private final Object writeThreadLock = new Object();
- // TODO These being public is a temporary hack to simplify refactoring
- public volatile ByteBuffer socketWriteBuffer;
- public volatile boolean writeBufferFlipped;
+ protected volatile ByteBuffer socketWriteBuffer;
+ protected volatile boolean writeBufferFlipped;
/**
* For "non-blocking" writes use an external set of buffers. Although the
@@ -80,8 +79,7 @@ public abstract class SocketWrapperBase<
* the possible need to write HTTP headers, there may be more than one
write
* to the OutputBuffer.
*/
- // TODO This being public is a temporary hack to simplify refactoring
- public final LinkedBlockingDeque<ByteBufferHolder> bufferedWrites =
+ protected final LinkedBlockingDeque<ByteBufferHolder> bufferedWrites =
new LinkedBlockingDeque<>();
/**
@@ -180,8 +178,7 @@ public abstract class SocketWrapperBase<
}
public Object getWriteThreadLock() { return writeThreadLock; }
- // TODO This being public is a temporary hack to simplify refactoring
- public boolean hasMoreDataToFlush() {
+ protected boolean hasMoreDataToFlush() {
return (writeBufferFlipped && socketWriteBuffer.remaining() > 0) ||
(!writeBufferFlipped && socketWriteBuffer.position() > 0);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]