This is an automated email from the ASF dual-hosted git repository.

remm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tomcat.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b5ad1b  Refactor write notifications
1b5ad1b is described below

commit 1b5ad1bacf6af53c33e85172c34ca020743faf04
Author: remm <r...@apache.org>
AuthorDate: Tue Mar 19 11:19:29 2019 +0100

    Refactor write notifications
    
    Use the model from read notifications, and simplify (normally nesting
    shouldn't be an issue, the code should ensure one notification when the
    write ends, and avoid any until the next write at least), to attempt to
    fix leftover very rare CI failure on TestCoyoteOutputStream. One
    possible issue right now is the write semaphore release, so it needs
    some testing.
    Temporarily disable the async IO API (since it doesn't use notifications
    at all and is more robust by design, testing HTTP/2 and websockets
    writes with the regular API is more useful).
---
 java/org/apache/tomcat/util/net/Nio2Endpoint.java | 73 ++++++++++++-----------
 1 file changed, 39 insertions(+), 34 deletions(-)

diff --git a/java/org/apache/tomcat/util/net/Nio2Endpoint.java 
b/java/org/apache/tomcat/util/net/Nio2Endpoint.java
index c49c209..689c98b 100644
--- a/java/org/apache/tomcat/util/net/Nio2Endpoint.java
+++ b/java/org/apache/tomcat/util/net/Nio2Endpoint.java
@@ -39,7 +39,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLSession;
@@ -111,6 +110,7 @@ public class Nio2Endpoint extends 
AbstractJsseEndpoint<Nio2Channel,AsynchronousS
 
     // --------------------------------------------------------- Public Methods
 
+
     /**
      * Number of keep-alive sockets.
      *
@@ -463,14 +463,6 @@ public class Nio2Endpoint extends 
AbstractJsseEndpoint<Nio2Channel,AsynchronousS
 
     public static class Nio2SocketWrapper extends 
SocketWrapperBase<Nio2Channel> {
 
-        private static final ThreadLocal<AtomicInteger> 
nestedWriteCompletionCount =
-                new ThreadLocal<AtomicInteger>() {
-            @Override
-            protected AtomicInteger initialValue() {
-                return new AtomicInteger(0);
-            }
-        };
-
         private SendfileData sendfileData = null;
 
         private final CompletionHandler<Integer, ByteBuffer> 
readCompletionHandler;
@@ -619,37 +611,38 @@ public class Nio2Endpoint extends 
AbstractJsseEndpoint<Nio2Channel,AsynchronousS
                 @Override
                 public void completed(Integer nBytes, ByteBuffer attachment) {
                     writeNotify = false;
+                    boolean notify = false;
                     synchronized (writeCompletionHandler) {
                         if (nBytes.intValue() < 0) {
                             failed(new 
EOFException(sm.getString("iob.failedwrite")), attachment);
                         } else if (!nonBlockingWriteBuffer.isEmpty()) {
-                            nestedWriteCompletionCount.get().incrementAndGet();
                             // Continue writing data using a gathering write
                             ByteBuffer[] array = 
nonBlockingWriteBuffer.toArray(attachment);
                             getSocket().write(array, 0, array.length,
                                     toNio2Timeout(getWriteTimeout()), 
TimeUnit.MILLISECONDS,
                                     array, gatheringWriteCompletionHandler);
-                            nestedWriteCompletionCount.get().decrementAndGet();
                         } else if (attachment.hasRemaining()) {
                             // Regular write
-                            nestedWriteCompletionCount.get().incrementAndGet();
                             getSocket().write(attachment, 
toNio2Timeout(getWriteTimeout()),
                                     TimeUnit.MILLISECONDS, attachment, 
writeCompletionHandler);
-                            nestedWriteCompletionCount.get().decrementAndGet();
                         } else {
                             // All data has been written
-                            if (writeInterest) {
-                                writeInterest = false;
+                            if (writeInterest && !Nio2Endpoint.isInline()) {
                                 writeNotify = true;
+                                // Set extra flag so that write nesting does 
not cause multiple notifications
+                                notify = true;
+                            } else {
+                                // Release here since there will be no
+                                // notify/dispatch to do the release.
+                                writePending.release();
                             }
-                            writePending.release();
+                            writeInterest = false;
                         }
                     }
-                    if (writeNotify && nestedWriteCompletionCount.get().get() 
== 0) {
-                        endpoint.processSocket(Nio2SocketWrapper.this, 
SocketEvent.OPEN_WRITE, Nio2Endpoint.isInline());
+                    if (notify) {
+                        endpoint.processSocket(Nio2SocketWrapper.this, 
SocketEvent.OPEN_WRITE, true);
                     }
                 }
-
                 @Override
                 public void failed(Throwable exc, ByteBuffer attachment) {
                     IOException ioe;
@@ -668,31 +661,34 @@ public class Nio2Endpoint extends 
AbstractJsseEndpoint<Nio2Channel,AsynchronousS
                 @Override
                 public void completed(Long nBytes, ByteBuffer[] attachment) {
                     writeNotify = false;
+                    boolean notify = false;
                     synchronized (writeCompletionHandler) {
                         if (nBytes.longValue() < 0) {
                             failed(new 
EOFException(sm.getString("iob.failedwrite")), attachment);
                         } else if (!nonBlockingWriteBuffer.isEmpty() || 
arrayHasData(attachment)) {
                             // Continue writing data using a gathering write
-                            nestedWriteCompletionCount.get().incrementAndGet();
                             ByteBuffer[] array = 
nonBlockingWriteBuffer.toArray(attachment);
                             getSocket().write(array, 0, array.length,
                                     toNio2Timeout(getWriteTimeout()), 
TimeUnit.MILLISECONDS,
                                     array, gatheringWriteCompletionHandler);
-                            nestedWriteCompletionCount.get().decrementAndGet();
                         } else {
                             // All data has been written
-                            if (writeInterest) {
-                                writeInterest = false;
+                            if (writeInterest && !Nio2Endpoint.isInline()) {
                                 writeNotify = true;
+                                // Set extra flag so that write nesting does 
not cause multiple notifications
+                                notify = true;
+                            } else {
+                                // Release here since there will be no
+                                // notify/dispatch to do the release.
+                                writePending.release();
                             }
-                            writePending.release();
+                            writeInterest = false;
                         }
                     }
-                    if (writeNotify && nestedWriteCompletionCount.get().get() 
== 0) {
-                        endpoint.processSocket(Nio2SocketWrapper.this, 
SocketEvent.OPEN_WRITE, Nio2Endpoint.isInline());
+                    if (notify) {
+                        endpoint.processSocket(Nio2SocketWrapper.this, 
SocketEvent.OPEN_WRITE, true);
                     }
                 }
-
                 @Override
                 public void failed(Throwable exc, ByteBuffer[] attachment) {
                     IOException ioe;
@@ -934,7 +930,7 @@ public class Nio2Endpoint extends 
AbstractJsseEndpoint<Nio2Channel,AsynchronousS
 
         @Override
         public boolean hasAsyncIO() {
-            return true;
+            return false;
         }
 
         /**
@@ -1053,6 +1049,8 @@ public class Nio2Endpoint extends 
AbstractJsseEndpoint<Nio2Channel,AsynchronousS
             if (timeout == -1) {
                 timeout = toNio2Timeout(getWriteTimeout());
             }
+            // Disable any regular write notifications caused by 
registerWriteInterest
+            writeNotify = true;
             if (block != BlockingMode.NON_BLOCK) {
                 try {
                     if (!writePending.tryAcquire(timeout, unit)) {
@@ -1253,7 +1251,7 @@ public class Nio2Endpoint extends 
AbstractJsseEndpoint<Nio2Channel,AsynchronousS
             // indicate the end of a write
             // Uses: if (writePending.tryAcquire(socketWrapper.getTimeout(), 
TimeUnit.MILLISECONDS))
             synchronized (writeCompletionHandler) {
-                if (writePending.tryAcquire()) {
+                if (writeNotify || writePending.tryAcquire()) {
                     // No pending completion handler, so writing to the main 
buffer
                     // is possible
                     socketBufferHandler.configureWriteBufferForWrite();
@@ -1303,7 +1301,7 @@ public class Nio2Endpoint extends 
AbstractJsseEndpoint<Nio2Channel,AsynchronousS
             // indicate the end of a write
             // Uses: if (writePending.tryAcquire(socketWrapper.getTimeout(), 
TimeUnit.MILLISECONDS))
             synchronized (writeCompletionHandler) {
-                if (writePending.tryAcquire()) {
+                if (writeNotify || writePending.tryAcquire()) {
                     // No pending completion handler, so writing to the main 
buffer
                     // is possible
                     socketBufferHandler.configureWriteBufferForWrite();
@@ -1383,7 +1381,9 @@ public class Nio2Endpoint extends 
AbstractJsseEndpoint<Nio2Channel,AsynchronousS
         private boolean flushNonBlocking(boolean hasPermit) throws IOException 
{
             checkError();
             synchronized (writeCompletionHandler) {
-                if (hasPermit || writePending.tryAcquire()) {
+                if (writeNotify || hasPermit || writePending.tryAcquire()) {
+                    // The code that was notified is now writing its data
+                    writeNotify = false;
                     socketBufferHandler.configureWriteBufferForRead();
                     if (!nonBlockingWriteBuffer.isEmpty()) {
                         ByteBuffer[] array = 
nonBlockingWriteBuffer.toArray(socketBufferHandler.getWriteBuffer());
@@ -1403,6 +1403,7 @@ public class Nio2Endpoint extends 
AbstractJsseEndpoint<Nio2Channel,AsynchronousS
                         if (!hasPermit) {
                             writePending.release();
                         }
+                        writeInterest = false;
                     }
                 }
                 return hasDataToWrite();
@@ -1424,6 +1425,7 @@ public class Nio2Endpoint extends 
AbstractJsseEndpoint<Nio2Channel,AsynchronousS
         public boolean hasDataToWrite() {
             synchronized (writeCompletionHandler) {
                 return !socketBufferHandler.isWriteBufferEmpty() ||
+                        writeNotify ||
                         !nonBlockingWriteBuffer.isEmpty() ||
                         writePending.availablePermits() == 0 ||
                         getError() != null;
@@ -1510,9 +1512,12 @@ public class Nio2Endpoint extends 
AbstractJsseEndpoint<Nio2Channel,AsynchronousS
         @Override
         public void registerWriteInterest() {
             synchronized (writeCompletionHandler) {
-                if (writePending.availablePermits() == 0) {
-                    writeInterest = true;
-                } else {
+                // A notification is already being sent
+                if (writeNotify) {
+                    return;
+                }
+                writeInterest = true;
+                if (writePending.availablePermits() == 1) {
                     // If no write is pending, notify
                     getEndpoint().processSocket(this, SocketEvent.OPEN_WRITE, 
true);
                 }


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to