Author: remm Date: Wed Apr 8 09:53:45 2015 New Revision: 1672050 URL: http://svn.apache.org/r1672050 Log: Add read, fixes and cleanups.
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Channel.java tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java tomcat/trunk/java/org/apache/tomcat/util/net/SecureNio2Channel.java Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Channel.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Channel.java?rev=1672050&r1=1672049&r2=1672050&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Channel.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Channel.java Wed Apr 8 09:53:45 2015 @@ -135,6 +135,12 @@ public class Nio2Channel implements Asyn sc.read(dst, timeout, unit, attachment, handler); } + public <A> void read(ByteBuffer[] dsts, + int offset, int length, long timeout, TimeUnit unit, + A attachment, CompletionHandler<Long,? super A> handler) { + sc.read(dsts, offset, length, timeout, unit, attachment, handler);; + } + @Override public Future<Integer> write(ByteBuffer src) { return sc.write(src); Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1672050&r1=1672049&r2=1672050&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Wed Apr 8 09:53:45 2015 @@ -1044,7 +1044,60 @@ public class Nio2Endpoint extends Abstra } // TODO: NIO2 style scatter/gather methods. - // TODO: SecureNio2Channel gather would need to be improved + // TODO: SecureNio2Channel scatter/gather would need to be improved + + public enum CompletionState { + /** + * Operation is pending and the completion handler will + * be called later. + */ + PENDING, + /** + * The operation completed inline, and the completion handler + * will not be called unless an error occurred. + */ + INLINE, + /** + * The operation completed, but not inline. + */ + DONE + } + + public enum CompletionHandlerCall { + /** + * Operation should continue, the completion handler shouldn't be + * called. + */ + CONTINUE, + /** + * The operation completed but the completion handler shouldn't be + * called. This is possibly useful if the operation completed + * inline. + */ + NONE, + /** + * The operation is complete, call the completion handler. + */ + DONE + } + + public interface CompletionCheck { + /** + * Return true if enough data has been read or written and the + * handler should be notified. Return false if the IO is + * incomplete (data has not been fully written while it should, + * or more data read is needed for further processing) and should + * be continued before the completion handler is called. + * + * @param state of the operation (done or done inline since the + * IO call is done) + * @param buffers ByteBuffer[] that has been passed to the + * original IO call + * @param offset that has been passed to the original IO call + * @param length that has been passed to the original IO call + */ + public CompletionHandlerCall callHandler(CompletionState state, ByteBuffer[] buffers, int offset, int length); + } /** * Internal state tracker for scatter/gather operations. @@ -1069,13 +1122,64 @@ public class Nio2Endpoint extends Abstra this.attachment = attachment; this.check = check; this.handler = handler; - this.pos = offset; } private long nBytes = 0; - private int pos; private CompletionState state = CompletionState.PENDING; } + private class ScatterReadCompletionHandler<A> implements CompletionHandler<Long, OperationState<A>> { + @Override + public void completed(Long nBytes, OperationState<A> state) { + if (nBytes.intValue() < 0) { + failed(new EOFException(), state); + } else { + state.nBytes += nBytes.longValue(); + CompletionState currentState = Nio2Endpoint.isInline() ? CompletionState.INLINE : CompletionState.DONE; + boolean complete = true; + boolean completion = true; + if (state.check != null) { + switch (state.check.callHandler(currentState, state.buffers, state.offset, state.length)) { + case CONTINUE: + complete = false; + break; + case DONE: + break; + case NONE: + completion = false; + break; + } + } + if (complete) { + readPending.release(); + state.state = currentState; + if (completion) { + state.handler.completed(Long.valueOf(state.nBytes), state.attachment); + } + } else { + getSocket().read(state.buffers, state.offset, state.length, + state.timeout, state.unit, state, this); + } + } + } + @Override + public void failed(Throwable exc, OperationState<A> state) { + IOException ioe; + if (exc instanceof IOException) { + ioe = (IOException) exc; + } else { + ioe = new IOException(exc); + } + Nio2SocketWrapper.this.setError(ioe); + readPending.release(); + if (exc instanceof AsynchronousCloseException) { + // If already closed, don't call onError and close again + return; + } + state.state = Nio2Endpoint.isInline() ? CompletionState.INLINE : CompletionState.DONE; + state.handler.failed(ioe, state.attachment); + } + } + private class GatherWriteCompletionHandler<A> implements CompletionHandler<Long, OperationState<A>> { @Override public void completed(Long nBytes, OperationState<A> state) { @@ -1083,38 +1187,30 @@ public class Nio2Endpoint extends Abstra failed(new EOFException(), state); } else { state.nBytes += nBytes.longValue(); - if (state.pos == state.offset + state.length) { + CompletionState currentState = Nio2Endpoint.isInline() ? CompletionState.INLINE : CompletionState.DONE; + boolean complete = true; + boolean completion = true; + if (state.check != null) { + switch (state.check.callHandler(currentState, state.buffers, state.offset, state.length)) { + case CONTINUE: + complete = false; + break; + case DONE: + break; + case NONE: + completion = false; + break; + } + } + if (complete) { writePending.release(); - state.state = Nio2Endpoint.isInline() ? CompletionState.INLINE : CompletionState.DONE; - if (state.check == null - || state.check.callHandler(state.state, state.buffers, state.offset, state.length) - == CompletionHandlerCall.DONE) { + state.state = currentState; + if (completion) { state.handler.completed(Long.valueOf(state.nBytes), state.attachment); } } else { - if (state.check == null) { - // Call completion handler - writePending.release(); - state.handler.completed(Long.valueOf(state.nBytes), state.attachment); - } else { - boolean inline = Nio2Endpoint.isInline(); - switch (state.check.callHandler(inline ? CompletionState.INLINE : CompletionState.DONE, - state.buffers, state.offset, state.length)) { - case CONTINUE: - getSocket().write(state.buffers, state.offset, state.length, - state.timeout, state.unit, state, this); - break; - case DONE: - writePending.release(); - state.state = inline ? CompletionState.INLINE : CompletionState.DONE; - state.handler.completed(Long.valueOf(state.nBytes), state.attachment); - break; - case NONE: - writePending.release(); - state.state = inline ? CompletionState.INLINE : CompletionState.DONE; - break; - } - } + getSocket().write(state.buffers, state.offset, state.length, + state.timeout, state.unit, state, this); } } } @@ -1133,62 +1229,10 @@ public class Nio2Endpoint extends Abstra } } - public enum CompletionState { - /** - * Operation is pending and the completion handler will - * be called later. - */ - PENDING, - /** - * The operation completed inline, and the completion handler - * will not be called unless an error occurred. - */ - INLINE, - /** - * The operation completed, but not inline. - */ - DONE - } - - public enum CompletionHandlerCall { - /** - * Operation should continue, the completion handler shouldn't be - * called. - */ - CONTINUE, - /** - * The operation completed but the completion handler shouldn't be - * called. This is possibly useful if the operation completed - * inline. - */ - NONE, - /** - * The operation is complete, call the completion handler. - */ - DONE - } - - public interface CompletionCheck { - /** - * Return true if enough data has been read or written and the - * handler should be notified. Return false if the IO is - * incomplete (data has not been fully written while it should, - * or more data read is needed for further processing) and should - * be continued before the completion handler is called. - * - * @param state of the operation (done or done inline since the - * IO call is done) - * @param buffers ByteBuffer[] that has been passed to the - * original IO call - * @param offset that has been passed to the original IO call - * @param length that has been passed to the original IO call - */ - public CompletionHandlerCall callHandler(CompletionState state, ByteBuffer[] buffers, int offset, int length); - } - /** * This utility CompletionCheck will cause the write to fully write - * all remaining data. + * all remaining data. If the operation completes inline, the + * completion handler will not be called. */ public static final CompletionCheck COMPLETE_WRITE = new CompletionCheck() { @Override @@ -1204,7 +1248,8 @@ public class Nio2Endpoint extends Abstra /** * This utility CompletionCheck will cause the completion handler - * to be called once some data has been read. + * to be called once some data has been read. If the operation + * completes inline, the completion handler will not be called. */ public static final CompletionCheck READ_DATA = new CompletionCheck() { @Override @@ -1240,8 +1285,7 @@ public class Nio2Endpoint extends Abstra OperationState<A> state = new OperationState<>(dsts, offset, length, timeout, unit, attachment, check, handler); if (readPending.tryAcquire()) { Nio2Endpoint.startInline(); - // FIXME: Add scatter read to Nio2Channel and ScatterReadCompletionHandler class - //getSocket().read(dsts, offset, length, timeout, unit, state, new ScatterReadCompletionHandler<>()); + getSocket().read(dsts, offset, length, timeout, unit, state, new ScatterReadCompletionHandler<>()); Nio2Endpoint.endInline(); } else { throw new ReadPendingException(); Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SecureNio2Channel.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SecureNio2Channel.java?rev=1672050&r1=1672049&r2=1672050&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/SecureNio2Channel.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/SecureNio2Channel.java Wed Apr 8 09:53:45 2015 @@ -794,6 +794,37 @@ public class SecureNio2Channel extends N } @Override + public <A> void read(ByteBuffer[] dsts, int offset, int length, + long timeout, TimeUnit unit, A attachment, + final CompletionHandler<Long, ? super A> handler) { + if (offset < 0 || dsts == null || (offset + length) > dsts.length) { + throw new IllegalArgumentException(); + } + ByteBuffer dst = null; + // Find the first buffer with space + for (int i = 0; i < length; i++) { + ByteBuffer current = dsts[offset + i]; + if (current.position() < current.limit()) { + dst = current; + } + } + if (dst == null) { + throw new IllegalArgumentException(); + } + CompletionHandler<Integer, ? super A> handlerWrapper = new CompletionHandler<Integer, A>() { + @Override + public void completed(Integer result, A attachment) { + handler.completed(Long.valueOf(result.longValue()), attachment); + } + @Override + public void failed(Throwable exc, A attachment) { + handler.failed(exc, attachment); + } + }; + read(dst, timeout, unit, attachment, handlerWrapper); + } + + @Override public <A> void write(final ByteBuffer src, final long timeout, final TimeUnit unit, final A attachment, final CompletionHandler<Integer, ? super A> handler) { // Check state --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org