Author: remm
Date: Tue Apr 7 16:43:58 2015
New Revision: 1671886
URL: http://svn.apache.org/r1671886
Log:
- Start experimenting with a state wrapper over NIO2 for scatter/gather
operations.
- This effectively bypasses the processor (the completion handler would get to
call it if it wants to).
- All in a single block for easy removal if not useful in the end.
Modified:
tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1671886&r1=1671885&r2=1671886&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Tue Apr 7
16:43:58 2015
@@ -31,6 +31,8 @@ import java.nio.channels.AsynchronousSoc
import java.nio.channels.ClosedChannelException;
import java.nio.channels.CompletionHandler;
import java.nio.channels.FileChannel;
+import java.nio.channels.ReadPendingException;
+import java.nio.channels.WritePendingException;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
@@ -1041,6 +1043,244 @@ public class Nio2Endpoint extends Abstra
}
}
+ // TODO: NIO2 style scatter/gather methods.
+ // TODO: SecureNio2Channel gather would need to be improved
+
+ /**
+ * Internal state tracker for scatter/gather operations.
+ */
+ private class OperationState<A> {
+ private final ByteBuffer[] buffers;
+ private final int offset;
+ private final int length;
+ private final A attachment;
+ private final long timeout;
+ private final TimeUnit unit;
+ private final CompletionCheck check;
+ private final CompletionHandler<Long, ? super A> handler;
+ private OperationState(ByteBuffer[] buffers, int offset, int
length,
+ long timeout, TimeUnit unit, A attachment, CompletionCheck
check,
+ CompletionHandler<Long, ? super A> handler) {
+ this.buffers = buffers;
+ this.offset = offset;
+ this.length = length;
+ this.timeout = timeout;
+ this.unit = unit;
+ this.attachment = attachment;
+ this.check = check;
+ this.handler = handler;
+ this.pos = offset;
+ }
+ private long nBytes = 0;
+ private int pos;
+ private CompletionState state = CompletionState.PENDING;
+ }
+
+ private class GatherWriteCompletionHandler<A> implements
CompletionHandler<Long, OperationState<A>> {
+ @Override
+ public void completed(Long nBytes, OperationState<A> state) {
+ if (nBytes.longValue() < 0) {
+ failed(new EOFException(), state);
+ } else {
+ state.nBytes += nBytes.longValue();
+ if (state.pos == state.offset + state.length) {
+ writePending.release();
+ state.state = Nio2Endpoint.isInline() ?
CompletionState.INLINE : CompletionState.DONE;
+ if (state.check == null
+ || state.check.callHandler(state.state,
state.buffers, state.offset, state.length)
+ == CompletionHandlerCall.DONE) {
+
state.handler.completed(Long.valueOf(state.nBytes), state.attachment);
+ }
+ } else {
+ if (state.check == null) {
+ // Call completion handler
+ writePending.release();
+
state.handler.completed(Long.valueOf(state.nBytes), state.attachment);
+ } else {
+ boolean inline = Nio2Endpoint.isInline();
+ switch (state.check.callHandler(inline ?
CompletionState.INLINE : CompletionState.DONE,
+ state.buffers, state.offset,
state.length)) {
+ case CONTINUE:
+ getSocket().write(state.buffers, state.offset,
state.length,
+ state.timeout, state.unit, state,
this);
+ break;
+ case DONE:
+ writePending.release();
+ state.state = inline ? CompletionState.INLINE
: CompletionState.DONE;
+
state.handler.completed(Long.valueOf(state.nBytes), state.attachment);
+ break;
+ case NONE:
+ writePending.release();
+ state.state = inline ? CompletionState.INLINE
: CompletionState.DONE;
+ break;
+ }
+ }
+ }
+ }
+ }
+ @Override
+ public void failed(Throwable exc, OperationState<A> state) {
+ IOException ioe;
+ if (exc instanceof IOException) {
+ ioe = (IOException) exc;
+ } else {
+ ioe = new IOException(exc);
+ }
+ Nio2SocketWrapper.this.setError(ioe);
+ writePending.release();
+ state.state = Nio2Endpoint.isInline() ? CompletionState.INLINE
: CompletionState.DONE;
+ state.handler.failed(ioe, state.attachment);
+ }
+ }
+
+ public enum CompletionState {
+ /**
+ * Operation is pending and the completion handler will
+ * be called later.
+ */
+ PENDING,
+ /**
+ * The operation completed inline, and the completion handler
+ * will not be called unless an error occurred.
+ */
+ INLINE,
+ /**
+ * The operation completed, but not inline.
+ */
+ DONE
+ }
+
+ public enum CompletionHandlerCall {
+ /**
+ * Operation should continue, the completion handler shouldn't be
+ * called.
+ */
+ CONTINUE,
+ /**
+ * The operation completed but the completion handler shouldn't be
+ * called. This is possibly useful if the operation completed
+ * inline.
+ */
+ NONE,
+ /**
+ * The operation is complete, call the completion handler.
+ */
+ DONE
+ }
+
+ public interface CompletionCheck {
+ /**
+ * Return true if enough data has been read or written and the
+ * handler should be notified.
+ * @param dsts ByteBuffer[] that has been passed to the
+ * original IO call
+ * @param offset that has been passed to the original IO call
+ * @param length that has been passed to the original IO call
+ */
+ public CompletionHandlerCall callHandler(CompletionState state,
ByteBuffer[] buffers, int offset, int length);
+ }
+
+ /**
+ * This utility CompletionCheck will cause the write to fully write
+ * all remaining data.
+ */
+ public static final CompletionCheck COMPLETE_WRITE = new
CompletionCheck() {
+ public CompletionHandlerCall callHandler(CompletionState state,
ByteBuffer[] buffers, int offset, int length) {
+ for (int i = 0; i < offset; i++) {
+ if (buffers[i].remaining() > 0) {
+ return CompletionHandlerCall.CONTINUE;
+ }
+ }
+ return (state == CompletionState.DONE) ?
CompletionHandlerCall.DONE : CompletionHandlerCall.NONE;
+ }
+ };
+
+ /**
+ * This utility CompletionCheck will cause the completion handler
+ * to be called once some data has been read.
+ */
+ public static final CompletionCheck READ_DATA = new CompletionCheck() {
+ public CompletionHandlerCall callHandler(CompletionState state,
ByteBuffer[] buffers, int offset, int length) {
+ return (state == CompletionState.DONE) ?
CompletionHandlerCall.DONE : CompletionHandlerCall.NONE;
+ }
+ };
+
+ /**
+ * Scatter read. The completion handler will be called once some
+ * data has been read or an error occurred. If a CompletionCheck
+ * object has been provided, the completion handler will only be
+ * called if the callHandler method returned true. If no
+ * CompletionCheck object has been provided, the completion handler
+ * will be called.
+ * @param dsts
+ * @param offset
+ * @param length
+ * @param timeout
+ * @param unit
+ * @param attachment
+ * @param check
+ * @param handler
+ * @return
+ */
+ // FIXME: @Override
+ public <A> CompletionState read(ByteBuffer[] dsts,
+ int offset, int length,
+ long timeout, TimeUnit unit, A attachment,
+ CompletionCheck check,
+ CompletionHandler<Long, ? super A> handler) {
+ OperationState<A> state = new OperationState<A>(dsts, offset,
length, timeout, unit, attachment, check, handler);
+ if (readPending.tryAcquire()) {
+ Nio2Endpoint.startInline();
+ // FIXME: Add scatter read to Nio2Channel and
ScatterReadCompletionHandler class
+ //getSocket().read(dsts, offset, length, timeout, unit, state,
new ScatterReadCompletionHandler<>());
+ Nio2Endpoint.endInline();
+ } else {
+ throw new ReadPendingException();
+ }
+ return state.state;
+ }
+
+ public boolean isWritePending() {
+ synchronized (writeCompletionHandler) {
+ return writePending.availablePermits() == 0;
+ }
+ }
+
+ /**
+ * Gather write. The completion handler will be called once some
+ * data has been written or an error occurred. If a CompletionCheck
+ * object has been provided, the completion handler will only be
+ * called if the callHandler method returned true. If no
+ * CompletionCheck object has been provided, the completion handler
+ * will be called.
+ * @param srcs
+ * @param offset
+ * @param length
+ * @param timeout
+ * @param unit
+ * @param attachment
+ * @param check
+ * @param handler
+ * @return
+ */
+ // FIXME: @Override
+ public <A> CompletionState write(ByteBuffer[] srcs,
+ int offset, int length,
+ long timeout, TimeUnit unit, A attachment,
+ CompletionCheck check,
+ CompletionHandler<Long, ? super A> handler) {
+ OperationState<A> state = new OperationState<A>(srcs, offset,
length, timeout, unit, attachment, check, handler);
+ if (writePending.tryAcquire()) {
+ Nio2Endpoint.startInline();
+ getSocket().write(srcs, offset, length, timeout, unit, state,
new GatherWriteCompletionHandler<>());
+ Nio2Endpoint.endInline();
+ } else {
+ throw new WritePendingException();
+ }
+ return state.state;
+ }
+
+ // TODO: End NIO2 style scatter/gather methods.
/* Callers of this method must:
* - have acquired the readPending semaphore
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]