Author: remm Date: Fri May 18 09:39:26 2018 New Revision: 1831839 URL: http://svn.apache.org/viewvc?rev=1831839&view=rev Log: As read and write are symetric, remove code duplication
Modified: tomcat/trunk/ (props changed) tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Propchange: tomcat/trunk/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Fri May 18 09:39:26 2018 @@ -1,2 +1,3 @@ /tomcat/tc8.0.x/trunk:1809644 /tomcat/tc8.5.x/trunk:1802799,1808880,1809646 +/tomcat/trunk:1816751-1816762 Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1831839&r1=1831838&r2=1831839&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Fri May 18 09:39:26 2018 @@ -862,6 +862,7 @@ public class Nio2Endpoint extends Abstra * Internal state tracker for scatter/gather operations. */ private static class OperationState<A> { + private final boolean read; private final ByteBuffer[] buffers; private final int offset; private final int length; @@ -871,9 +872,12 @@ public class Nio2Endpoint extends Abstra private final BlockingMode block; private final CompletionCheck check; private final CompletionHandler<Long, ? super A> handler; - private OperationState(ByteBuffer[] buffers, int offset, int length, + private final Semaphore semaphore; + private OperationState(boolean read, ByteBuffer[] buffers, int offset, int length, BlockingMode block, long timeout, TimeUnit unit, A attachment, - CompletionCheck check, CompletionHandler<Long, ? super A> handler) { + CompletionCheck check, CompletionHandler<Long, ? super A> handler, + Semaphore semaphore) { + this.read = read; this.buffers = buffers; this.offset = offset; this.length = length; @@ -883,6 +887,7 @@ public class Nio2Endpoint extends Abstra this.attachment = attachment; this.check = check; this.handler = handler; + this.semaphore = semaphore; } private volatile long nBytes = 0; private volatile CompletionState state = CompletionState.PENDING; @@ -915,8 +920,9 @@ public class Nio2Endpoint extends Abstra return CompletionState.NOT_DONE; } } - OperationState<A> state = new OperationState<>(dsts, offset, length, block, timeout, unit, attachment, check, handler); - ScatterReadCompletionHandler<A> completion = new ScatterReadCompletionHandler<>(); + OperationState<A> state = new OperationState<>(true, dsts, offset, length, block, + timeout, unit, attachment, check, handler, readPending); + VectoredIOCompletionHandler<A> completion = new VectoredIOCompletionHandler<>(); Nio2Endpoint.startInline(); long nBytes = 0; if (!socketBufferHandler.isReadBufferEmpty()) { @@ -954,84 +960,6 @@ public class Nio2Endpoint extends Abstra return state.state; } - private class ScatterReadCompletionHandler<A> implements CompletionHandler<Long, OperationState<A>> { - @Override - public void completed(Long nBytes, OperationState<A> state) { - if (nBytes.longValue() < 0) { - failed(new EOFException(), state); - } else { - state.nBytes += nBytes.longValue(); - CompletionState currentState = Nio2Endpoint.isInline() ? CompletionState.INLINE : CompletionState.DONE; - boolean complete = true; - boolean completion = true; - if (state.check != null) { - switch (state.check.callHandler(currentState, state.buffers, state.offset, state.length)) { - case CONTINUE: - complete = false; - break; - case DONE: - break; - case NONE: - completion = false; - break; - } - } - if (complete) { - boolean notify = false; - readPending.release(); - if (state.block == BlockingMode.BLOCK && currentState != CompletionState.INLINE) { - notify = true; - } else { - state.state = currentState; - } - if (completion && state.handler != null) { - state.handler.completed(Long.valueOf(state.nBytes), state.attachment); - } - if (notify) { - synchronized (state) { - state.state = currentState; - state.notify(); - } - } - } else { - getSocket().read(state.buffers, state.offset, state.length, - state.timeout, state.unit, state, this); - } - } - } - @Override - public void failed(Throwable exc, OperationState<A> state) { - IOException ioe; - if (exc instanceof IOException) { - ioe = (IOException) exc; - } else { - ioe = new IOException(exc); - } - setError(ioe); - boolean notify = false; - readPending.release(); - readPending.release(); - if (state.block == BlockingMode.BLOCK) { - notify = true; - } else { - state.state = Nio2Endpoint.isInline() ? CompletionState.ERROR : CompletionState.DONE; - } - if (exc instanceof AsynchronousCloseException) { - // If already closed, don't call onError and close again - return; - } - if (state.handler != null) { - state.handler.failed(ioe, state.attachment); - } - if (notify) { - synchronized (state) { - state.state = Nio2Endpoint.isInline() ? CompletionState.ERROR : CompletionState.DONE; - state.notify(); - } - } - } - } - @Override public <A> CompletionState write(ByteBuffer[] srcs, int offset, int length, BlockingMode block, long timeout, TimeUnit unit, A attachment, @@ -1068,8 +996,9 @@ public class Nio2Endpoint extends Abstra return CompletionState.ERROR; } } - OperationState<A> state = new OperationState<>(srcs, offset, length, block, timeout, unit, attachment, check, handler); - GatherWriteCompletionHandler<A> completion = new GatherWriteCompletionHandler<>(); + OperationState<A> state = new OperationState<>(false, srcs, offset, length, block, + timeout, unit, attachment, check, handler, writePending); + VectoredIOCompletionHandler<A> completion = new VectoredIOCompletionHandler<>(); Nio2Endpoint.startInline(); // It should be less necessary to check the buffer state as it is easy to flush before getSocket().write(srcs, offset, length, timeout, unit, state, completion); @@ -1092,7 +1021,7 @@ public class Nio2Endpoint extends Abstra return state.state; } - private class GatherWriteCompletionHandler<A> implements CompletionHandler<Long, OperationState<A>> { + private class VectoredIOCompletionHandler<A> implements CompletionHandler<Long, OperationState<A>> { @Override public void completed(Long nBytes, OperationState<A> state) { if (nBytes.longValue() < 0) { @@ -1116,7 +1045,7 @@ public class Nio2Endpoint extends Abstra } if (complete) { boolean notify = false; - writePending.release(); + state.semaphore.release(); if (state.block == BlockingMode.BLOCK && currentState != CompletionState.INLINE) { notify = true; } else { @@ -1132,8 +1061,13 @@ public class Nio2Endpoint extends Abstra } } } else { - getSocket().write(state.buffers, state.offset, state.length, - state.timeout, state.unit, state, this); + if (state.read) { + getSocket().read(state.buffers, state.offset, state.length, + state.timeout, state.unit, state, this); + } else { + getSocket().write(state.buffers, state.offset, state.length, + state.timeout, state.unit, state, this); + } } } } @@ -1147,7 +1081,7 @@ public class Nio2Endpoint extends Abstra } setError(ioe); boolean notify = false; - writePending.release(); + state.semaphore.release(); if (state.block == BlockingMode.BLOCK) { notify = true; } else { --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org