Author: markt Date: Thu Jan 8 13:10:46 2015 New Revision: 1650278 URL: http://svn.apache.org/r1650278 Log: Untested first pass at pushing down NIO2 writes.
Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalNio2OutputBuffer.java tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalNio2OutputBuffer.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalNio2OutputBuffer.java?rev=1650278&r1=1650277&r2=1650278&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/InternalNio2OutputBuffer.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/InternalNio2OutputBuffer.java Thu Jan 8 13:10:46 2015 @@ -17,20 +17,10 @@ package org.apache.coyote.http11; -import java.io.EOFException; import java.io.IOException; -import java.net.SocketTimeoutException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import org.apache.coyote.Response; -import org.apache.tomcat.util.buf.ByteBufferHolder; import org.apache.tomcat.util.net.Nio2Channel; -import org.apache.tomcat.util.net.Nio2Endpoint; -import org.apache.tomcat.util.net.Nio2Endpoint.Nio2SocketWrapper; /** * Output buffer implementation for NIO2. @@ -50,178 +40,16 @@ public class InternalNio2OutputBuffer ex // ------------------------------------------------------ Protected Methods @Override - protected void addToBB(byte[] buf, int offset, int length) - throws IOException { - - if (length == 0) - return; - if (socketWrapper == null || socketWrapper.getSocket() == null) - return; - - if (isBlocking()) { - while (length > 0) { - int thisTime = transfer(buf, offset, length, socketWrapper.socketWriteBuffer); - length = length - thisTime; - offset = offset + thisTime; - if (socketWrapper.socketWriteBuffer.remaining() == 0) { - flushBuffer(true); - } - } - } else { - // FIXME: Possible new behavior: - // If there's non blocking abuse (like a test writing 1MB in a single - // "non blocking" write), then block until the previous write is - // done rather than continue buffering - // Also allows doing autoblocking - // Could be "smart" with coordination with the main CoyoteOutputStream to - // indicate the end of a write - // Uses: if (writePending.tryAcquire(socketWrapper.getTimeout(), TimeUnit.MILLISECONDS)) - if (((Nio2SocketWrapper)socketWrapper).writePending.tryAcquire()) { - synchronized (((Nio2SocketWrapper)socketWrapper).writeCompletionHandler) { - // No pending completion handler, so writing to the main buffer - // is possible - int thisTime = transfer(buf, offset, length, socketWrapper.socketWriteBuffer); - length = length - thisTime; - offset = offset + thisTime; - if (length > 0) { - // Remaining data must be buffered - addToBuffers(buf, offset, length); - } - flushBufferInternal(false, true); - } - } else { - synchronized (((Nio2SocketWrapper)socketWrapper).writeCompletionHandler) { - addToBuffers(buf, offset, length); - } - } - } - } - - - private void addToBuffers(byte[] buf, int offset, int length) { - ByteBuffer buffer = ByteBuffer.allocate(length); - buffer.put(buf, offset, length); - socketWrapper.bufferedWrites.add(new ByteBufferHolder(buffer, false)); + protected void addToBB(byte[] buf, int offset, int length) throws IOException { + socketWrapper.write(isBlocking(), buf, offset, length); } - /** - * Callback to write data from the buffer. - */ @Override protected boolean flushBuffer(boolean block) throws IOException { - if (socketWrapper.getError() != null) { - throw socketWrapper.getError(); - } - return flushBufferInternal(block, false); + return socketWrapper.flush(block); } - private boolean flushBufferInternal(boolean block, boolean hasPermit) throws IOException { - if (socketWrapper == null || socketWrapper.getSocket() == null) - return false; - - if (block) { - if (!isBlocking()) { - // The final flush is blocking, but the processing was using - // non blocking so wait until an async write is done - try { - if (((Nio2SocketWrapper)socketWrapper).writePending.tryAcquire(socketWrapper.getTimeout(), TimeUnit.MILLISECONDS)) { - ((Nio2SocketWrapper)socketWrapper).writePending.release(); - } - } catch (InterruptedException e) { - // Ignore timeout - } - } - try { - if (socketWrapper.bufferedWrites.size() > 0) { - for (ByteBufferHolder holder : socketWrapper.bufferedWrites) { - holder.flip(); - ByteBuffer buffer = holder.getBuf(); - while (buffer.hasRemaining()) { - if (socketWrapper.getSocket().write(buffer).get(socketWrapper.getTimeout(), TimeUnit.MILLISECONDS).intValue() < 0) { - throw new EOFException(sm.getString("iob.failedwrite")); - } - } - } - socketWrapper.bufferedWrites.clear(); - } - if (!socketWrapper.writeBufferFlipped) { - socketWrapper.socketWriteBuffer.flip(); - socketWrapper.writeBufferFlipped = true; - } - while (socketWrapper.socketWriteBuffer.hasRemaining()) { - if (socketWrapper.getSocket().write(socketWrapper.socketWriteBuffer).get(socketWrapper.getTimeout(), TimeUnit.MILLISECONDS).intValue() < 0) { - throw new EOFException(sm.getString("iob.failedwrite")); - } - } - } catch (ExecutionException e) { - if (e.getCause() instanceof IOException) { - throw (IOException) e.getCause(); - } else { - throw new IOException(e); - } - } catch (InterruptedException e) { - throw new IOException(e); - } catch (TimeoutException e) { - throw new SocketTimeoutException(); - } - socketWrapper.socketWriteBuffer.clear(); - socketWrapper.writeBufferFlipped = false; - return false; - } else { - synchronized (((Nio2SocketWrapper)socketWrapper).writeCompletionHandler) { - if (hasPermit || ((Nio2SocketWrapper)socketWrapper).writePending.tryAcquire()) { - if (!socketWrapper.writeBufferFlipped) { - socketWrapper.socketWriteBuffer.flip(); - socketWrapper.writeBufferFlipped = true; - } - Nio2Endpoint.startInline(); - if (socketWrapper.bufferedWrites.size() > 0) { - // Gathering write of the main buffer plus all leftovers - ArrayList<ByteBuffer> arrayList = new ArrayList<>(); - if (socketWrapper.socketWriteBuffer.hasRemaining()) { - arrayList.add(socketWrapper.socketWriteBuffer); - } - for (ByteBufferHolder buffer : socketWrapper.bufferedWrites) { - buffer.flip(); - arrayList.add(buffer.getBuf()); - } - socketWrapper.bufferedWrites.clear(); - ByteBuffer[] array = arrayList.toArray(new ByteBuffer[arrayList.size()]); - socketWrapper.getSocket().write(array, 0, array.length, socketWrapper.getTimeout(), - TimeUnit.MILLISECONDS, array, ((Nio2SocketWrapper)socketWrapper).gatheringWriteCompletionHandler); - } else if (socketWrapper.socketWriteBuffer.hasRemaining()) { - // Regular write - socketWrapper.getSocket().write(socketWrapper.socketWriteBuffer, socketWrapper.getTimeout(), - TimeUnit.MILLISECONDS, socketWrapper.socketWriteBuffer, ((Nio2SocketWrapper)socketWrapper).writeCompletionHandler); - } else { - // Nothing was written - ((Nio2SocketWrapper)socketWrapper).writePending.release(); - } - Nio2Endpoint.endInline(); - if (((Nio2SocketWrapper)socketWrapper).writePending.availablePermits() > 0) { - if (socketWrapper.socketWriteBuffer.remaining() == 0) { - socketWrapper.socketWriteBuffer.clear(); - socketWrapper.writeBufferFlipped = false; - } - } - } - return socketWrapper.hasMoreDataToFlush() || hasBufferedData() || socketWrapper.getError() != null; - } - } - } - - - @Override - public boolean hasDataToWrite() { - synchronized (((Nio2SocketWrapper)socketWrapper).writeCompletionHandler) { - return socketWrapper.hasMoreDataToFlush() || hasBufferedData() || socketWrapper.getError() != null; - } - } - - protected boolean hasBufferedData() { - return socketWrapper.bufferedWrites.size() > 0; - } @Override protected void registerWriteInterest() { Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1650278&r1=1650277&r2=1650278&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Thu Jan 8 13:10:46 2015 @@ -722,16 +722,13 @@ public class Nio2Endpoint extends Abstra private volatile boolean readPending = false; private volatile boolean interest = true; - private final int maxWrite; - // TODO These are public for now to aid refactoring - public final CompletionHandler<Integer, ByteBuffer> writeCompletionHandler; - public final CompletionHandler<Long, ByteBuffer[]> gatheringWriteCompletionHandler; - public final Semaphore writePending = new Semaphore(1); + private final CompletionHandler<Integer, ByteBuffer> writeCompletionHandler; + private final CompletionHandler<Long, ByteBuffer[]> gatheringWriteCompletionHandler; + private final Semaphore writePending = new Semaphore(1); public Nio2SocketWrapper(Nio2Channel channel, Nio2Endpoint endpoint) { super(channel, endpoint); - maxWrite = channel.getBufHandler().getWriteBuffer().capacity(); this.readCompletionHandler = new CompletionHandler<Integer, SocketWrapperBase<Nio2Channel>>() { @Override @@ -1091,57 +1088,105 @@ public class Nio2Endpoint extends Abstra @Override - public void write(boolean block, byte[] b, int off, int len) throws IOException { - int leftToWrite = len; - int offset = off; - - while (leftToWrite > 0) { - int writeThisLoop; - int writtenThisLoop; + public void write(boolean block, byte[] buf, int off, int len) throws IOException { + if (len == 0) + return; + if (getSocket() == null) + return; - if (leftToWrite > maxWrite) { - writeThisLoop = maxWrite; - } else { - writeThisLoop = leftToWrite; - } - - writtenThisLoop = writeInternal(block, b, offset, writeThisLoop); - if (writtenThisLoop < 0) { - throw new EOFException(); - } - if (!block && writePending.availablePermits() == 0) { - // Prevent concurrent writes in non blocking mode, - // leftover data has to be buffered - return; + if (block) { + while (len > 0) { + int thisTime = transfer(buf, off, len, socketWriteBuffer); + len = len - thisTime; + off = off + thisTime; + if (socketWriteBuffer.remaining() == 0) { + flush(true); + } } - offset += writtenThisLoop; - leftToWrite -= writtenThisLoop; - - if (writtenThisLoop < writeThisLoop) { - break; + } else { + // FIXME: Possible new behavior: + // If there's non blocking abuse (like a test writing 1MB in a single + // "non blocking" write), then block until the previous write is + // done rather than continue buffering + // Also allows doing autoblocking + // Could be "smart" with coordination with the main CoyoteOutputStream to + // indicate the end of a write + // Uses: if (writePending.tryAcquire(socketWrapper.getTimeout(), TimeUnit.MILLISECONDS)) + if (writePending.tryAcquire()) { + synchronized (writeCompletionHandler) { + // No pending completion handler, so writing to the main buffer + // is possible + int thisTime = transfer(buf, off, len, socketWriteBuffer); + len = len - thisTime; + off = off + thisTime; + if (len > 0) { + // Remaining data must be buffered + addToBuffers(buf, off, len); + } + flush(false, true); + } + } else { + synchronized (writeCompletionHandler) { + addToBuffers(buf, off, len); + } } } } @Override - protected int doWrite(ByteBuffer buffer, boolean block, boolean flip) - throws IOException { - // TODO Auto-generated method stub + protected int doWrite(ByteBuffer buffer, boolean block, boolean flip) throws IOException { + // NO-OP for NIO2 since write() is over-ridden above. return 0; } - private int writeInternal(boolean block, byte[] b, int off, int len) - throws IOException { - ByteBuffer writeBuffer = getSocket().getBufHandler().getWriteBuffer(); - int written = 0; + + @Override + public boolean flush(boolean block) throws IOException { + if (getError() != null) { + throw getError(); + } + return super.flush(block); + } + + + @Override + protected boolean flush(boolean block, boolean hasPermit) throws IOException { + if (getSocket() == null) + return false; + if (block) { - writeBuffer.clear(); - writeBuffer.put(b, off, len); - writeBuffer.flip(); - writeBufferFlipped = true; try { - written = getSocket().write(writeBuffer).get(getTimeout(), TimeUnit.MILLISECONDS).intValue(); + if (writePending.tryAcquire(getTimeout(), TimeUnit.MILLISECONDS)) { + writePending.release(); + } else { + // TODO + } + } catch (InterruptedException e) { + // Ignore timeout + } + try { + if (bufferedWrites.size() > 0) { + for (ByteBufferHolder holder : bufferedWrites) { + holder.flip(); + ByteBuffer buffer = holder.getBuf(); + while (buffer.hasRemaining()) { + if (getSocket().write(buffer).get(getTimeout(), TimeUnit.MILLISECONDS).intValue() < 0) { + throw new EOFException(sm.getString("iob.failedwrite")); + } + } + } + bufferedWrites.clear(); + } + if (!writeBufferFlipped) { + socketWriteBuffer.flip(); + writeBufferFlipped = true; + } + while (socketWriteBuffer.hasRemaining()) { + if (getSocket().write(socketWriteBuffer).get(getTimeout(), TimeUnit.MILLISECONDS).intValue() < 0) { + throw new EOFException(sm.getString("iob.failedwrite")); + } + } } catch (ExecutionException e) { if (e.getCause() instanceof IOException) { throw (IOException) e.getCause(); @@ -1151,22 +1196,60 @@ public class Nio2Endpoint extends Abstra } catch (InterruptedException e) { throw new IOException(e); } catch (TimeoutException e) { - SocketTimeoutException ex = new SocketTimeoutException(); - throw ex; + throw new SocketTimeoutException(); } + socketWriteBuffer.clear(); + writeBufferFlipped = false; + return false; } else { - if (writePending.tryAcquire()) { - writeBuffer.clear(); - writeBuffer.put(b, off, len); - writeBuffer.flip(); - writeBufferFlipped = true; - Nio2Endpoint.startInline(); - getSocket().write(writeBuffer, getTimeout(), TimeUnit.MILLISECONDS, writeBuffer, writeCompletionHandler); - Nio2Endpoint.endInline(); - written = len; + synchronized (writeCompletionHandler) { + if (hasPermit || writePending.tryAcquire()) { + if (!writeBufferFlipped) { + socketWriteBuffer.flip(); + writeBufferFlipped = true; + } + Nio2Endpoint.startInline(); + if (bufferedWrites.size() > 0) { + // Gathering write of the main buffer plus all leftovers + ArrayList<ByteBuffer> arrayList = new ArrayList<>(); + if (socketWriteBuffer.hasRemaining()) { + arrayList.add(socketWriteBuffer); + } + for (ByteBufferHolder buffer : bufferedWrites) { + buffer.flip(); + arrayList.add(buffer.getBuf()); + } + bufferedWrites.clear(); + ByteBuffer[] array = arrayList.toArray(new ByteBuffer[arrayList.size()]); + getSocket().write(array, 0, array.length, getTimeout(), + TimeUnit.MILLISECONDS, array, gatheringWriteCompletionHandler); + } else if (socketWriteBuffer.hasRemaining()) { + // Regular write + getSocket().write(socketWriteBuffer, getTimeout(), + TimeUnit.MILLISECONDS, socketWriteBuffer, writeCompletionHandler); + } else { + // Nothing was written + writePending.release(); + } + Nio2Endpoint.endInline(); + if (writePending.availablePermits() > 0) { + if (socketWriteBuffer.remaining() == 0) { + socketWriteBuffer.clear(); + writeBufferFlipped = false; + } + } + } + return hasMoreDataToFlush() || hasBufferedData() || getError() != null; } } - return written; + } + + + @Override + public boolean hasDataToWrite() { + synchronized (writeCompletionHandler) { + return hasMoreDataToFlush() || hasBufferedData() || getError() != null; + } } Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java?rev=1650278&r1=1650277&r2=1650278&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java Thu Jan 8 13:10:46 2015 @@ -70,9 +70,8 @@ public abstract class SocketWrapperBase< */ private final Object writeThreadLock = new Object(); - // TODO These being public is a temporary hack to simplify refactoring - public volatile ByteBuffer socketWriteBuffer; - public volatile boolean writeBufferFlipped; + protected volatile ByteBuffer socketWriteBuffer; + protected volatile boolean writeBufferFlipped; /** * For "non-blocking" writes use an external set of buffers. Although the @@ -80,8 +79,7 @@ public abstract class SocketWrapperBase< * the possible need to write HTTP headers, there may be more than one write * to the OutputBuffer. */ - // TODO This being public is a temporary hack to simplify refactoring - public final LinkedBlockingDeque<ByteBufferHolder> bufferedWrites = + protected final LinkedBlockingDeque<ByteBufferHolder> bufferedWrites = new LinkedBlockingDeque<>(); /** @@ -180,8 +178,7 @@ public abstract class SocketWrapperBase< } public Object getWriteThreadLock() { return writeThreadLock; } - // TODO This being public is a temporary hack to simplify refactoring - public boolean hasMoreDataToFlush() { + protected boolean hasMoreDataToFlush() { return (writeBufferFlipped && socketWriteBuffer.remaining() > 0) || (!writeBufferFlipped && socketWriteBuffer.position() > 0); } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org