This is an automated email from the ASF dual-hosted git repository.

remm pushed a commit to branch 8.5.x
in repository https://gitbox.apache.org/repos/asf/tomcat.git


The following commit(s) were added to refs/heads/8.5.x by this push:
     new 38dc12b  Harmonize NIO2 isReadyForWrite with isReadyForRead code
38dc12b is described below

commit 38dc12b640dec45037b000c9af48738483553302
Author: remm <r...@apache.org>
AuthorDate: Wed Mar 27 11:35:49 2019 +0100

    Harmonize NIO2 isReadyForWrite with isReadyForRead code
    
    Following 8.5 #1717, it is better to redo it with sync as it seems to
    indicate a leftover write issue.
    Also add cleanups.
---
 java/org/apache/tomcat/util/net/Nio2Endpoint.java  | 121 ++++++++++++++-------
 .../apache/tomcat/util/net/SocketWrapperBase.java  |   2 +
 webapps/docs/changelog.xml                         |   3 +
 3 files changed, 84 insertions(+), 42 deletions(-)

diff --git a/java/org/apache/tomcat/util/net/Nio2Endpoint.java 
b/java/org/apache/tomcat/util/net/Nio2Endpoint.java
index 992063f..9182aec 100644
--- a/java/org/apache/tomcat/util/net/Nio2Endpoint.java
+++ b/java/org/apache/tomcat/util/net/Nio2Endpoint.java
@@ -111,6 +111,7 @@ public class Nio2Endpoint extends 
AbstractJsseEndpoint<Nio2Channel> {
 
     // --------------------------------------------------------- Public Methods
 
+
     /**
      * Number of keep-alive sockets.
      *
@@ -132,7 +133,7 @@ public class Nio2Endpoint extends 
AbstractJsseEndpoint<Nio2Channel> {
     public void bind() throws Exception {
 
         // Create worker collection
-        if ( getExecutor() == null ) {
+        if (getExecutor() == null) {
             createExecutor();
         }
         if (getExecutor() instanceof ExecutorService) {
@@ -146,7 +147,7 @@ public class Nio2Endpoint extends 
AbstractJsseEndpoint<Nio2Channel> {
         serverSock = AsynchronousServerSocketChannel.open(threadGroup);
         socketProperties.setProperties(serverSock);
         InetSocketAddress addr = (getAddress()!=null?new 
InetSocketAddress(getAddress(),getPort()):new InetSocketAddress(getPort()));
-        serverSock.bind(addr,getAcceptCount());
+        serverSock.bind(addr, getAcceptCount());
 
         // Initialize thread count defaults for acceptor, poller
         if (acceptorThreadCount != 1) {
@@ -339,12 +340,12 @@ public class Nio2Endpoint extends 
AbstractJsseEndpoint<Nio2Channel> {
     protected SocketProcessorBase<Nio2Channel> createSocketProcessor(
             SocketWrapperBase<Nio2Channel> socketWrapper, SocketEvent event) {
         return new SocketProcessor(socketWrapper, event);
-    }
+            }
 
     @Override
     protected Log getLog() {
         return log;
-    }
+        }
 
 
     @Override
@@ -715,9 +716,7 @@ public class Nio2Endpoint extends 
AbstractJsseEndpoint<Nio2Channel> {
                 }
 
                 int nRead = fillReadBuffer(false);
-
                 boolean isReady = nRead > 0;
-
                 if (!isReady) {
                     readInterest = true;
                 }
@@ -727,6 +726,39 @@ public class Nio2Endpoint extends 
AbstractJsseEndpoint<Nio2Channel> {
 
 
         @Override
+        public boolean isReadyForWrite() {
+            synchronized (writeCompletionHandler) {
+                if (writeNotify) {
+                    return true;
+                }
+
+                if (!writePending.tryAcquire()) {
+                    writeInterest = true;
+                    return false;
+                }
+
+                if (socketBufferHandler.isWriteBufferEmpty() && 
nonBlockingWriteBuffer.isEmpty()) {
+                    writePending.release();
+                    return true;
+                }
+
+                boolean dataLeft = false;
+                try {
+                    dataLeft = flushNonBlocking(true);
+                } catch (IOException e) {
+                    setError(e);
+                    return true;
+                }
+                boolean isReady = !dataLeft;
+                if (!isReady) {
+                    writeInterest = true;
+                }
+                return isReady;
+            }
+        }
+
+
+        @Override
         public int read(boolean block, byte[] b, int off, int len) throws 
IOException {
             checkError();
 
@@ -847,7 +879,6 @@ public class Nio2Endpoint extends 
AbstractJsseEndpoint<Nio2Channel> {
                         readInterest = true;
                     }
                 }
-
                 return nRead;
             }
         }
@@ -903,7 +934,7 @@ public class Nio2Endpoint extends 
AbstractJsseEndpoint<Nio2Channel> {
 
         @Override
         public boolean hasAsyncIO() {
-            return false;
+            return true;
         }
 
         /**
@@ -1387,9 +1418,8 @@ public class Nio2Endpoint extends 
AbstractJsseEndpoint<Nio2Channel> {
         @Override
         public boolean hasDataToRead() {
             synchronized (readCompletionHandler) {
-                return !socketBufferHandler.isReadBufferEmpty() ||
-                        readNotify ||
-                        getError() != null;
+                return !socketBufferHandler.isReadBufferEmpty()
+                        || readNotify || getError() != null;
             }
         }
 
@@ -1397,8 +1427,8 @@ public class Nio2Endpoint extends 
AbstractJsseEndpoint<Nio2Channel> {
         @Override
         public boolean hasDataToWrite() {
             synchronized (writeCompletionHandler) {
-                return !socketBufferHandler.isWriteBufferEmpty() || 
writeNotify ||
-                        !nonBlockingWriteBuffer.isEmpty() || getError() != 
null;
+                return !socketBufferHandler.isWriteBufferEmpty() || 
!nonBlockingWriteBuffer.isEmpty()
+                        || writeNotify || writePending.availablePermits() == 0 
|| getError() != null;
             }
         }
 
@@ -1412,39 +1442,46 @@ public class Nio2Endpoint extends 
AbstractJsseEndpoint<Nio2Channel> {
 
 
         @Override
-        public boolean awaitReadComplete(long timeout, TimeUnit unit) {
-            try {
-                if (readPending.tryAcquire(timeout, unit)) {
-                    readPending.release();
-                }
-            } catch (InterruptedException e) {
-                return false;
+        public boolean isWritePending() {
+            synchronized (writeCompletionHandler) {
+                return writePending.availablePermits() == 0;
             }
-            return true;
         }
 
 
         @Override
-        public boolean awaitWriteComplete(long timeout, TimeUnit unit) {
-            try {
-                if (writePending.tryAcquire(timeout, unit)) {
-                    writePending.release();
+        public boolean awaitReadComplete(long timeout, TimeUnit unit) {
+            synchronized (readCompletionHandler) {
+                try {
+                    if (readNotify) {
+                        return true;
+                    } else if (readPending.tryAcquire(timeout, unit)) {
+                        readPending.release();
+                        return true;
+                    } else {
+                        return false;
+                    }
+                } catch (InterruptedException e) {
+                    return false;
                 }
-            } catch (InterruptedException e) {
-                return false;
             }
-            return true;
         }
 
-        /*
-         * This should only be called from a thread that currently holds a lock
-         * on the socket. This prevents a race condition between a pending read
-         * being completed and processed and a thread triggering a new read.
-         */
-        void releaseReadPending() {
-            synchronized (readCompletionHandler) {
-                if (readPending.availablePermits() == 0) {
-                    readPending.release();
+
+        @Override
+        public boolean awaitWriteComplete(long timeout, TimeUnit unit) {
+            synchronized (writeCompletionHandler) {
+                try {
+                    if (writeNotify) {
+                        return true;
+                    } else if (writePending.tryAcquire(timeout, unit)) {
+                        writePending.release();
+                        return true;
+                    } else {
+                        return false;
+                    }
+                } catch (InterruptedException e) {
+                    return false;
                 }
             }
         }
@@ -1459,12 +1496,12 @@ public class Nio2Endpoint extends 
AbstractJsseEndpoint<Nio2Channel> {
                 }
                 readInterest = true;
                 if (readPending.tryAcquire()) {
-                    // No read pending, so await bytes
+                    // No read pending, so do a read
                     try {
                         if (fillReadBuffer(false) > 0) {
-                            // Special case where the read completes inline, 
there is no notification
-                            // in that case and it cannot happen elsewhere
-                            
getEndpoint().processSocket(Nio2SocketWrapper.this, SocketEvent.OPEN_READ, 
true);
+                            // Special case where the read completed inline, 
there is no notification
+                            // in that case so it has to be done here
+                            getEndpoint().processSocket(this, 
SocketEvent.OPEN_READ, true);
                         }
                     } catch (IOException e) {
                         // Will never happen
@@ -1484,7 +1521,7 @@ public class Nio2Endpoint extends 
AbstractJsseEndpoint<Nio2Channel> {
                 }
                 writeInterest = true;
                 if (writePending.availablePermits() == 1) {
-                    // If no write is pending, notify
+                    // If no write is pending, notify that writing is possible
                     getEndpoint().processSocket(this, SocketEvent.OPEN_WRITE, 
true);
                 }
             }
diff --git a/java/org/apache/tomcat/util/net/SocketWrapperBase.java 
b/java/org/apache/tomcat/util/net/SocketWrapperBase.java
index df84510..c3097d6 100644
--- a/java/org/apache/tomcat/util/net/SocketWrapperBase.java
+++ b/java/org/apache/tomcat/util/net/SocketWrapperBase.java
@@ -953,6 +953,7 @@ public abstract class SocketWrapperBase<E> {
      *  <code>false</code> if the operation is still pending and
      *  the specified timeout has passed
      */
+    @Deprecated
     public boolean awaitReadComplete(long timeout, TimeUnit unit) {
         return true;
     }
@@ -967,6 +968,7 @@ public abstract class SocketWrapperBase<E> {
      *  <code>false</code> if the operation is still pending and
      *  the specified timeout has passed
      */
+    @Deprecated
     public boolean awaitWriteComplete(long timeout, TimeUnit unit) {
         return true;
     }
diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml
index 86ec3a7..fc04dd7 100644
--- a/webapps/docs/changelog.xml
+++ b/webapps/docs/changelog.xml
@@ -90,6 +90,9 @@
       <fix>
         Refactor NIO2 write pending strategy for the classic IO API. (remm)
       </fix>
+      <fix>
+        Harmonize NIO2 isReadyForWrite with isReadyForRead code. (remm)
+      </fix>
     </changelog>
   </subsection>
   <subsection name="Jasper">


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to