Attached a patch that fixes those huge writes to drain pipe

Tested on cetos8, all test pass

On 17/11/2021 08:08, Ruediger Pluem wrote:


On 11/17/21 2:39 AM, Mladen Turk wrote:


On 16/11/2021 12:00, Yann Ylavic wrote:
On Wed, Nov 10, 2021 at 4:19 PM Yann Ylavic <ylavic....@gmail.com> wrote:

Otherwise I'll revert because I have no way to test it, but I think
that apr_poll_drain_wakeup_pipe() might block on Windows for the same
reason as r1894914 for platforms with poll()able pipes.

Reverted in 1.7.x, I'll leave it in trunk for now..


OK, did some digging.

First it's an abomination that 512+ threads call apr_pollset_wakeup
for the same pollset.

When I wrote that wakeup logic, 512 bytes in drain_wakeup_pipe was a
quick hack to make it work without adding 'already send wakeup for this pollset'

Here is the original comment:
/* Although we write just one byte to the other end of the pipe
  * during wakeup, multiple treads could call the wakeup.
  * So simply drain out from the input side of the pipe all
  * the data.
  */

So it seems it's time to make it properly :)

Basically the 'apr_file_putc(1, pollset->wakeup_pipe[1])' in
apr_pollset_wakeup should be called only if not already called
for that pollset.

pollset should have something like 'send_wakeup' member
that will be reset by apr_poll_drain_wakeup_pipe call.

The problem is how to make thread safe.

How about using an int flag set / read with atomic operations e.g. 
apr_atomic_cas32?
Atomic stuff should not add too much performance penalties.

Regards

Rüdiger


Regards
--
^TM
Index: include/arch/unix/apr_arch_poll_private.h
===================================================================
--- include/arch/unix/apr_arch_poll_private.h   (revision 1895102)
+++ include/arch/unix/apr_arch_poll_private.h   (working copy)
@@ -123,6 +123,7 @@
     /* Pipe descriptors used for wakeup */
     apr_file_t *wakeup_pipe[2];
     apr_pollfd_t wakeup_pfd;
+    volatile apr_uint32_t wakeup_set;
     apr_pollset_private_t *p;
     const apr_pollset_provider_t *provider;
 };
@@ -151,6 +152,7 @@
     /* Pipe descriptors used for wakeup */
     apr_file_t *wakeup_pipe[2];
     apr_pollfd_t wakeup_pfd;
+    volatile apr_uint32_t wakeup_set;
     int fd;
     apr_pollcb_pset pollset;
     apr_pollfd_t **copyset;
@@ -182,6 +184,6 @@
 apr_status_t apr_poll_create_wakeup_pipe(apr_pool_t *pool, apr_pollfd_t *pfd, 
                                          apr_file_t **wakeup_pipe);
 apr_status_t apr_poll_close_wakeup_pipe(apr_file_t **wakeup_pipe);
-void apr_poll_drain_wakeup_pipe(apr_file_t **wakeup_pipe);
+void apr_poll_drain_wakeup_pipe(volatile apr_uint32_t *wakeup_set, apr_file_t 
**wakeup_pipe);
 
 #endif /* APR_ARCH_POLL_PRIVATE_H */
Index: poll/unix/epoll.c
===================================================================
--- poll/unix/epoll.c   (revision 1895102)
+++ poll/unix/epoll.c   (working copy)
@@ -289,7 +289,7 @@
             if ((pollset->flags & APR_POLLSET_WAKEABLE) &&
                 fdptr->desc_type == APR_POLL_FILE &&
                 fdptr->desc.f == pollset->wakeup_pipe[0]) {
-                apr_poll_drain_wakeup_pipe(pollset->wakeup_pipe);
+                apr_poll_drain_wakeup_pipe(&pollset->wakeup_set, 
pollset->wakeup_pipe);
                 rv = APR_EINTR;
             }
             else {
@@ -460,7 +460,7 @@
             if ((pollcb->flags & APR_POLLSET_WAKEABLE) &&
                 pollfd->desc_type == APR_POLL_FILE &&
                 pollfd->desc.f == pollcb->wakeup_pipe[0]) {
-                apr_poll_drain_wakeup_pipe(pollcb->wakeup_pipe);
+                apr_poll_drain_wakeup_pipe(&pollcb->wakeup_set, 
pollcb->wakeup_pipe);
                 return APR_EINTR;
             }
 
Index: poll/unix/kqueue.c
===================================================================
--- poll/unix/kqueue.c  (revision 1895102)
+++ poll/unix/kqueue.c  (working copy)
@@ -286,7 +286,7 @@
             if ((pollset->flags & APR_POLLSET_WAKEABLE) &&
                 fd->desc_type == APR_POLL_FILE &&
                 fd->desc.f == pollset->wakeup_pipe[0]) {
-                apr_poll_drain_wakeup_pipe(pollset->wakeup_pipe);
+                apr_poll_drain_wakeup_pipe(&pollset->wakeup_set, 
pollset->wakeup_pipe);
                 rv = APR_EINTR;
             }
             else {
@@ -473,7 +473,7 @@
             if ((pollcb->flags & APR_POLLSET_WAKEABLE) &&
                 pollfd->desc_type == APR_POLL_FILE &&
                 pollfd->desc.f == pollcb->wakeup_pipe[0]) {
-                apr_poll_drain_wakeup_pipe(pollcb->wakeup_pipe);
+                apr_poll_drain_wakeup_pipe(&pollcb->wakeup_set, 
pollcb->wakeup_pipe);
                 return APR_EINTR;
             }
 
Index: poll/unix/poll.c
===================================================================
--- poll/unix/poll.c    (revision 1895102)
+++ poll/unix/poll.c    (working copy)
@@ -275,7 +275,7 @@
                 if ((pollset->flags & APR_POLLSET_WAKEABLE) &&
                     pollset->p->query_set[i].desc_type == APR_POLL_FILE &&
                     pollset->p->query_set[i].desc.f == 
pollset->wakeup_pipe[0]) {
-                    apr_poll_drain_wakeup_pipe(pollset->wakeup_pipe);
+                    apr_poll_drain_wakeup_pipe(&pollset->wakeup_set, 
pollset->wakeup_pipe);
                     rv = APR_EINTR;
                 }
                 else {
@@ -422,7 +422,7 @@
                 if ((pollcb->flags & APR_POLLSET_WAKEABLE) &&
                     pollfd->desc_type == APR_POLL_FILE &&
                     pollfd->desc.f == pollcb->wakeup_pipe[0]) {
-                    apr_poll_drain_wakeup_pipe(pollcb->wakeup_pipe);
+                    apr_poll_drain_wakeup_pipe(&pollcb->wakeup_set, 
pollcb->wakeup_pipe);
                     return APR_EINTR;
                 }
 
Index: poll/unix/pollcb.c
===================================================================
--- poll/unix/pollcb.c  (revision 1895102)
+++ poll/unix/pollcb.c  (working copy)
@@ -23,6 +23,7 @@
 #include "apr_poll.h"
 #include "apr_time.h"
 #include "apr_portable.h"
+#include "apr_atomic.h"
 #include "apr_arch_file_io.h"
 #include "apr_arch_networkio.h"
 #include "apr_arch_poll_private.h"
@@ -134,6 +135,7 @@
     pollcb->flags = flags;
     pollcb->pool = p;
     pollcb->provider = provider;
+    pollcb->wakeup_set = 0;
 
     rv = (*provider->create)(pollcb, size, p, flags);
     if (rv == APR_ENOTIMPL) {
@@ -212,8 +214,12 @@
 
 APR_DECLARE(apr_status_t) apr_pollcb_wakeup(apr_pollcb_t *pollcb)
 {
-    if (pollcb->flags & APR_POLLSET_WAKEABLE)
-        return apr_file_putc(1, pollcb->wakeup_pipe[1]);
+    if (pollcb->flags & APR_POLLSET_WAKEABLE) {
+        if (apr_atomic_cas32(&pollcb->wakeup_set, 1, 0) == 0)
+            return apr_file_putc(1, pollcb->wakeup_pipe[1]);
+        else
+           return APR_SUCCESS;
+    }
     else
         return APR_EINIT;
 }
Index: poll/unix/pollset.c
===================================================================
--- poll/unix/pollset.c (revision 1895102)
+++ poll/unix/pollset.c (working copy)
@@ -23,6 +23,7 @@
 #include "apr_poll.h"
 #include "apr_time.h"
 #include "apr_portable.h"
+#include "apr_atomic.h"
 #include "apr_arch_file_io.h"
 #include "apr_arch_networkio.h"
 #include "apr_arch_poll_private.h"
@@ -140,6 +141,7 @@
     pollset->pool = p;
     pollset->flags = flags;
     pollset->provider = provider;
+    pollset->wakeup_set = 0;
 
     rv = (*provider->create)(pollset, size, p, flags);
     if (rv == APR_ENOTIMPL) {
@@ -216,8 +218,12 @@
 
 APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset)
 {
-    if (pollset->flags & APR_POLLSET_WAKEABLE)
-        return apr_file_putc(1, pollset->wakeup_pipe[1]);
+    if (pollset->flags & APR_POLLSET_WAKEABLE) {
+        if (apr_atomic_cas32(&pollset->wakeup_set, 1, 0) == 0)
+            return apr_file_putc(1, pollset->wakeup_pipe[1]);
+        else
+           return APR_SUCCESS;
+    }
     else
         return APR_EINIT;
 }
Index: poll/unix/port.c
===================================================================
--- poll/unix/port.c    (revision 1895102)
+++ poll/unix/port.c    (working copy)
@@ -411,7 +411,7 @@
         if ((pollset->flags & APR_POLLSET_WAKEABLE) &&
             ep->pfd.desc_type == APR_POLL_FILE &&
             ep->pfd.desc.f == pollset->wakeup_pipe[0]) {
-            apr_poll_drain_wakeup_pipe(pollset->wakeup_pipe);
+            apr_poll_drain_wakeup_pipe(&pollset->wakeup_set, 
pollset->wakeup_pipe);
             rv = APR_EINTR;
         }
         else {
@@ -563,7 +563,7 @@
             if ((pollcb->flags & APR_POLLSET_WAKEABLE) &&
                 pollfd->desc_type == APR_POLL_FILE &&
                 pollfd->desc.f == pollcb->wakeup_pipe[0]) {
-                apr_poll_drain_wakeup_pipe(pollcb->wakeup_pipe);
+                apr_poll_drain_wakeup_pipe(&pollcb->wakeup_set, 
pollcb->wakeup_pipe);
                 return APR_EINTR;
             }
 
Index: poll/unix/select.c
===================================================================
--- poll/unix/select.c  (revision 1895102)
+++ poll/unix/select.c  (working copy)
@@ -401,7 +401,7 @@
         else {
             if ((pollset->flags & APR_POLLSET_WAKEABLE) &&
                 pollset->p->query_set[i].desc.f == pollset->wakeup_pipe[0]) {
-                apr_poll_drain_wakeup_pipe(pollset->wakeup_pipe);
+                apr_poll_drain_wakeup_pipe(&pollset->wakeup_set, 
pollset->wakeup_pipe);
                 rv = APR_EINTR;
                 continue;
             }
Index: poll/unix/wakeup.c
===================================================================
--- poll/unix/wakeup.c  (revision 1895102)
+++ poll/unix/wakeup.c  (working copy)
@@ -18,6 +18,7 @@
 #include "apr_poll.h"
 #include "apr_time.h"
 #include "apr_portable.h"
+#include "apr_atomic.h"
 #include "apr_arch_file_io.h"
 #include "apr_arch_networkio.h"
 #include "apr_arch_poll_private.h"
@@ -36,9 +37,6 @@
                                       pool)) != APR_SUCCESS)
         return rv;
 
-    /* Read end of the pipe is non-blocking */
-    apr_file_pipe_timeout_set(wakeup_pipe[0], 0);
-
     pfd->reqevents = APR_POLLIN;
     pfd->desc_type = APR_POLL_FILE;
     pfd->desc.f = wakeup_pipe[0];
@@ -139,18 +137,18 @@
 
 /* Read and discard whatever is in the wakeup pipe.
  */
-void apr_poll_drain_wakeup_pipe(apr_file_t **wakeup_pipe)
+void apr_poll_drain_wakeup_pipe(volatile apr_uint32_t *wakeup_set, apr_file_t 
**wakeup_pipe)
 {
-    char rb[512];
-    apr_size_t nr = sizeof(rb);
 
-    while (apr_file_read(wakeup_pipe[0], rb, &nr) == APR_SUCCESS) {
-        /* Although we write just one byte to the other end of the pipe
+    while (apr_atomic_cas32(wakeup_set, 0, 1) > 0) {
+        char ch;
+        /* though we write just one byte to the other end of the pipe
          * during wakeup, multiple threads could call the wakeup.
          * So simply drain out from the input side of the pipe all
          * the data.
          */
-        if (nr != sizeof(rb))
+        if (apr_file_getc(&ch, wakeup_pipe[0]) != APR_SUCCESS)
             break;
     }
 }
+

Reply via email to