Author: remm
Date: Fri May 18 09:39:26 2018
New Revision: 1831839

URL: http://svn.apache.org/viewvc?rev=1831839&view=rev
Log:
As read and write are symetric, remove code duplication

Modified:
    tomcat/trunk/   (props changed)
    tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java

Propchange: tomcat/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri May 18 09:39:26 2018
@@ -1,2 +1,3 @@
 /tomcat/tc8.0.x/trunk:1809644
 /tomcat/tc8.5.x/trunk:1802799,1808880,1809646
+/tomcat/trunk:1816751-1816762

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1831839&r1=1831838&r2=1831839&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Fri May 18 
09:39:26 2018
@@ -862,6 +862,7 @@ public class Nio2Endpoint extends Abstra
          * Internal state tracker for scatter/gather operations.
          */
         private static class OperationState<A> {
+            private final boolean read;
             private final ByteBuffer[] buffers;
             private final int offset;
             private final int length;
@@ -871,9 +872,12 @@ public class Nio2Endpoint extends Abstra
             private final BlockingMode block;
             private final CompletionCheck check;
             private final CompletionHandler<Long, ? super A> handler;
-            private OperationState(ByteBuffer[] buffers, int offset, int 
length,
+            private final Semaphore semaphore;
+            private OperationState(boolean read, ByteBuffer[] buffers, int 
offset, int length,
                     BlockingMode block, long timeout, TimeUnit unit, A 
attachment,
-                    CompletionCheck check, CompletionHandler<Long, ? super A> 
handler) {
+                    CompletionCheck check, CompletionHandler<Long, ? super A> 
handler,
+                    Semaphore semaphore) {
+                this.read = read;
                 this.buffers = buffers;
                 this.offset = offset;
                 this.length = length;
@@ -883,6 +887,7 @@ public class Nio2Endpoint extends Abstra
                 this.attachment = attachment;
                 this.check = check;
                 this.handler = handler;
+                this.semaphore = semaphore;
             }
             private volatile long nBytes = 0;
             private volatile CompletionState state = CompletionState.PENDING;
@@ -915,8 +920,9 @@ public class Nio2Endpoint extends Abstra
                     return CompletionState.NOT_DONE;
                 }
             }
-            OperationState<A> state = new OperationState<>(dsts, offset, 
length, block, timeout, unit, attachment, check, handler);
-            ScatterReadCompletionHandler<A> completion = new 
ScatterReadCompletionHandler<>();
+            OperationState<A> state = new OperationState<>(true, dsts, offset, 
length, block,
+                    timeout, unit, attachment, check, handler, readPending);
+            VectoredIOCompletionHandler<A> completion = new 
VectoredIOCompletionHandler<>();
             Nio2Endpoint.startInline();
             long nBytes = 0;
             if (!socketBufferHandler.isReadBufferEmpty()) {
@@ -954,84 +960,6 @@ public class Nio2Endpoint extends Abstra
             return state.state;
         }
 
-        private class ScatterReadCompletionHandler<A> implements 
CompletionHandler<Long, OperationState<A>> {
-            @Override
-            public void completed(Long nBytes, OperationState<A> state) {
-                if (nBytes.longValue() < 0) {
-                    failed(new EOFException(), state);
-                } else {
-                    state.nBytes += nBytes.longValue();
-                    CompletionState currentState = Nio2Endpoint.isInline() ? 
CompletionState.INLINE : CompletionState.DONE;
-                    boolean complete = true;
-                    boolean completion = true;
-                    if (state.check != null) {
-                        switch (state.check.callHandler(currentState, 
state.buffers, state.offset, state.length)) {
-                        case CONTINUE:
-                            complete = false;
-                            break;
-                        case DONE:
-                            break;
-                        case NONE:
-                            completion = false;
-                            break;
-                        }
-                    }
-                    if (complete) {
-                        boolean notify = false;
-                        readPending.release();
-                        if (state.block == BlockingMode.BLOCK && currentState 
!= CompletionState.INLINE) {
-                            notify = true;
-                        } else {
-                            state.state = currentState;
-                        }
-                        if (completion && state.handler != null) {
-                            
state.handler.completed(Long.valueOf(state.nBytes), state.attachment);
-                        }
-                        if (notify) {
-                            synchronized (state) {
-                                state.state = currentState;
-                                state.notify();
-                            }
-                        }
-                    } else {
-                        getSocket().read(state.buffers, state.offset, 
state.length,
-                                state.timeout, state.unit, state, this);
-                    }
-                }
-            }
-            @Override
-            public void failed(Throwable exc, OperationState<A> state) {
-                IOException ioe;
-                if (exc instanceof IOException) {
-                    ioe = (IOException) exc;
-                } else {
-                    ioe = new IOException(exc);
-                }
-                setError(ioe);
-                boolean notify = false;
-                readPending.release();
-                readPending.release();
-                if (state.block == BlockingMode.BLOCK) {
-                    notify = true;
-                } else {
-                    state.state = Nio2Endpoint.isInline() ? 
CompletionState.ERROR : CompletionState.DONE;
-                }
-                if (exc instanceof AsynchronousCloseException) {
-                    // If already closed, don't call onError and close again
-                    return;
-                }
-                if (state.handler != null) {
-                    state.handler.failed(ioe, state.attachment);
-                }
-                if (notify) {
-                    synchronized (state) {
-                        state.state = Nio2Endpoint.isInline() ? 
CompletionState.ERROR : CompletionState.DONE;
-                        state.notify();
-                    }
-                }
-            }
-        }
-
         @Override
         public <A> CompletionState write(ByteBuffer[] srcs, int offset, int 
length,
                 BlockingMode block, long timeout, TimeUnit unit, A attachment,
@@ -1068,8 +996,9 @@ public class Nio2Endpoint extends Abstra
                     return CompletionState.ERROR;
                 }
             }
-            OperationState<A> state = new OperationState<>(srcs, offset, 
length, block, timeout, unit, attachment, check, handler);
-            GatherWriteCompletionHandler<A> completion = new 
GatherWriteCompletionHandler<>();
+            OperationState<A> state = new OperationState<>(false, srcs, 
offset, length, block,
+                    timeout, unit, attachment, check, handler, writePending);
+            VectoredIOCompletionHandler<A> completion = new 
VectoredIOCompletionHandler<>();
             Nio2Endpoint.startInline();
             // It should be less necessary to check the buffer state as it is 
easy to flush before
             getSocket().write(srcs, offset, length, timeout, unit, state, 
completion);
@@ -1092,7 +1021,7 @@ public class Nio2Endpoint extends Abstra
             return state.state;
         }
 
-        private class GatherWriteCompletionHandler<A> implements 
CompletionHandler<Long, OperationState<A>> {
+        private class VectoredIOCompletionHandler<A> implements 
CompletionHandler<Long, OperationState<A>> {
             @Override
             public void completed(Long nBytes, OperationState<A> state) {
                 if (nBytes.longValue() < 0) {
@@ -1116,7 +1045,7 @@ public class Nio2Endpoint extends Abstra
                     }
                     if (complete) {
                         boolean notify = false;
-                        writePending.release();
+                        state.semaphore.release();
                         if (state.block == BlockingMode.BLOCK && currentState 
!= CompletionState.INLINE) {
                             notify = true;
                         } else {
@@ -1132,8 +1061,13 @@ public class Nio2Endpoint extends Abstra
                             }
                         }
                     } else {
-                        getSocket().write(state.buffers, state.offset, 
state.length,
-                                state.timeout, state.unit, state, this);
+                        if (state.read) {
+                            getSocket().read(state.buffers, state.offset, 
state.length,
+                                    state.timeout, state.unit, state, this);
+                        } else {
+                            getSocket().write(state.buffers, state.offset, 
state.length,
+                                    state.timeout, state.unit, state, this);
+                        }
                     }
                 }
             }
@@ -1147,7 +1081,7 @@ public class Nio2Endpoint extends Abstra
                 }
                 setError(ioe);
                 boolean notify = false;
-                writePending.release();
+                state.semaphore.release();
                 if (state.block == BlockingMode.BLOCK) {
                     notify = true;
                 } else {



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to