On Wed, Nov 17, 2021 at 5:19 PM Mladen Turk <mt...@apache.org> wrote:
>
> On 17/11/2021 16:53, Yann Ylavic wrote:
>
> > apr_poll_drain_wakeup_pipe() should consume each byte sent on the pipe
> > corresponding to a wakeup_set flip.
> >
>
> Yes, its basically just one byte and one call to the apr_file_getc.
> The 'while (apr_atomic_cas32(wakeup_set, 0, 1) > 0)' is there
> because some thread might call apr_pollset_wakeup while OS is
> inside write call. Anyhow, this ensures everything is pulled
> without blocking since non zero wakeup_set means there is
> something to read.

Possibly we could use lighter atomics to achieve the same,something
like the attached patch.
Still the compare-and-swap in apr_pollset_wakeup() to wakeup_set only
once, but apr_poll_drain_wakeup_pipe() does now unconditionally
consume one byte of the pipe and reset wakeup_set just after with an
atomic set.
Since we reach apr_poll_drain_wakeup_pipe() only when the system's
poll() triggers on the pipe, there is something on the pipe to read
(and being non-blocking is not an issue either).
If a new wakeup happens just after apr_poll_drain_wakeup_pipe() resets
wakeup_set, well it's for the next turn (no different than with the
current version..).

The advantage here is that we'd have a single "real" atomic operation
now for the wakeup (the atomic cas), draining would be as light as an
atomic set (i.e. a simple volatile access on platforms with non-weak
memory ordering, i.e. most).

WDYT?

Regards;
Yann.
Index: poll/os2/pollset.c
===================================================================
--- poll/os2/pollset.c	(revision 1895110)
+++ poll/os2/pollset.c	(working copy)
@@ -263,17 +263,14 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pol
 
         if (rtnevents) {
             if (i == 0 && pollset->wake_listen != NULL) {
+                char ch;
+                apr_size_t len = 1;
                 struct apr_sockaddr_t from_addr;
-                char buffer[16];
-                apr_size_t buflen;
-                for (;;) {
-                    buflen = sizeof(buffer);
-                    rv = apr_socket_recvfrom(&from_addr, pollset->wake_listen,
-                                             MSG_DONTWAIT, buffer, &buflen);
-                    if (rv != APR_SUCCESS) {
-                        break;
-                    }
-                    /* Woken up, drain the pipe still. */
+                rv = apr_socket_recvfrom(&from_addr, pollset->wake_listen,
+                                          MSG_DONTWAIT, &ch, &len);
+                if (rv == APR_SUCCESS) {
+                    /* Woken up, senders can fill the pipe again */
+                    apr_atomic_set32(&pollset->wakeup_set, 0);
                     rc = APR_EINTR;
                 }
             }
@@ -298,12 +295,15 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pol
 
 APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset)
 {
-    if (pollset->wake_sender) {
+    if (!pollset->wake_sender)
+        return APR_EINIT;
+
+    if (apr_atomic_cas32(&pollset->wakeup_set, 1, 0) == 0) {
         apr_size_t len = 1;
         return apr_socket_sendto(pollset->wake_sender, pollset->wake_address, 0, "", &len);
     }
 
-    return APR_EINIT;
+    return APR_SUCCESS;
 }
 
 
Index: poll/unix/pollcb.c
===================================================================
--- poll/unix/pollcb.c	(revision 1895110)
+++ poll/unix/pollcb.c	(working copy)
@@ -214,14 +214,13 @@ APR_DECLARE(apr_status_t) apr_pollcb_poll(apr_poll
 
 APR_DECLARE(apr_status_t) apr_pollcb_wakeup(apr_pollcb_t *pollcb)
 {
-    if (pollcb->flags & APR_POLLSET_WAKEABLE) {
-        if (apr_atomic_cas32(&pollcb->wakeup_set, 1, 0) == 0)
-            return apr_file_putc(1, pollcb->wakeup_pipe[1]);
-        else
-           return APR_SUCCESS;
-    }
-    else
+    if (!(pollcb->flags & APR_POLLSET_WAKEABLE))
         return APR_EINIT;
+
+    if (apr_atomic_cas32(&pollcb->wakeup_set, 1, 0) == 0)
+        return apr_file_putc(1, pollcb->wakeup_pipe[1]);
+
+    return APR_SUCCESS;
 }
 
 APR_DECLARE(const char *) apr_pollcb_method_name(apr_pollcb_t *pollcb)
Index: poll/unix/pollset.c
===================================================================
--- poll/unix/pollset.c	(revision 1895110)
+++ poll/unix/pollset.c	(working copy)
@@ -218,14 +218,13 @@ APR_DECLARE(apr_status_t) apr_pollset_destroy(apr_
 
 APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset)
 {
-    if (pollset->flags & APR_POLLSET_WAKEABLE) {
-        if (apr_atomic_cas32(&pollset->wakeup_set, 1, 0) == 0)
-            return apr_file_putc(1, pollset->wakeup_pipe[1]);
-        else
-           return APR_SUCCESS;
-    }
-    else
+    if (!(pollset->flags & APR_POLLSET_WAKEABLE))
         return APR_EINIT;
+
+    if (apr_atomic_cas32(&pollset->wakeup_set, 1, 0) == 0)
+        return apr_file_putc(1, pollset->wakeup_pipe[1]);
+
+    return APR_SUCCESS;
 }
 
 APR_DECLARE(apr_status_t) apr_pollset_add(apr_pollset_t *pollset,
Index: poll/unix/wakeup.c
===================================================================
--- poll/unix/wakeup.c	(revision 1895111)
+++ poll/unix/wakeup.c	(working copy)
@@ -81,8 +81,8 @@ apr_status_t apr_poll_create_wakeup_pipe(apr_pool_
 {
     apr_status_t rv;
 
-    if ((rv = apr_file_pipe_create(&wakeup_pipe[0], &wakeup_pipe[1],
-                                   pool)) != APR_SUCCESS)
+    if ((rv = apr_file_pipe_create_ex(&wakeup_pipe[0], &wakeup_pipe[1],
+                                      APR_WRITE_BLOCK, pool)))
         return rv;
 
     pfd->p = pool;
@@ -137,16 +137,9 @@ apr_status_t apr_poll_close_wakeup_pipe(apr_file_t
  */
 void apr_poll_drain_wakeup_pipe(volatile apr_uint32_t *wakeup_set, apr_file_t **wakeup_pipe)
 {
+    char ch;
 
-    while (apr_atomic_cas32(wakeup_set, 0, 1) > 0) {
-        char ch;
-        /* though we write just one byte to the other end of the pipe
-         * during wakeup, multiple threads could call the wakeup.
-         * So simply drain out from the input side of the pipe all
-         * the data.
-         */
-        if (apr_file_getc(&ch, wakeup_pipe[0]) != APR_SUCCESS)
-            break;
-    }
+    (void)apr_file_getc(&ch, wakeup_pipe[0]);
+    apr_atomic_set32(wakeup_set, 0);
 }
 

Reply via email to