On Fri, Jul 14, 2017 at 9:52 PM, Yann Ylavic <[email protected]> wrote:
>
> So overall, this patch may introduce the need for more workers than
> before, what was (wrongly) done by the listener thread has to be done
> somewhere anyway...

That patch didn't work (as reported by Stefan Pribe) and I now don't
feel the need to debug it further, see below.

>
> Finally, I think there is room for improvements like batching
> shutdowns in the same worker if there is no objection on the approach
> so far.

That's the way to go IMO, here is a new patch which is much simpler
and effective I think.

The idea is that when nonblocking is required (i.e. in the listener),
connections to flush and close are atomically pushed/popped to/from a
chain (linked list) by the listener/some worker.

So start_lingering_close_nonblocking() simply fills the chain (this is
atomic/nonblocking), and any worker thread which is done with its
current connection will empty the chain while calling
start_lingering_close_blocking() for each connection.

To prevent starvation of deferred lingering closes, the listener may
create a worker at the of its loop, when/if the chain is (fully)
filled.

While the previous patch potentially induced some overhead in the
number workers and thread contexts switches, I think this new one much
better in this regard.

What do you think of it?
Index: server/mpm/event/event.c
===================================================================
--- server/mpm/event/event.c	(revision 1802058)
+++ server/mpm/event/event.c	(working copy)
@@ -219,6 +219,12 @@ static apr_pollfd_t *listener_pollfd;
  */
 static apr_pollset_t *event_pollset;
 
+/*
+ * The chain of connections to be shutdown by a worker thread (deferred),
+ * linked list updated atomically.
+ */
+static event_conn_state_t *volatile defer_linger_chain = NULL;
+
 struct event_conn_state_t {
     /** APR_RING of expiration timeouts */
     APR_RING_ENTRY(event_conn_state_t) timeout_list;
@@ -243,7 +249,10 @@ struct event_conn_state_t {
     apr_pollfd_t pfd;
     /** public parts of the connection state */
     conn_state_t pub;
+    /** chaining in defer_linger_chain */
+    struct event_conn_state_t *chain;
 };
+
 APR_RING_HEAD(timeout_head_t, event_conn_state_t);
 
 struct timeout_queue {
@@ -805,31 +814,27 @@ static int start_lingering_close_blocking(event_co
 }
 
 /*
- * Close our side of the connection, NOT flushing data to the client.
- * This should only be called if there has been an error or if we know
- * that our send buffers are empty.
+ * Defer flush and close of the connection by adding it to defer_linger_chain,
+ * for a worker to grab it and do the job (should that be blocking).
  * Pre-condition: cs is not in any timeout queue and not in the pollset,
  *                timeout_mutex is not locked
- * return: 0 if connection is fully closed,
- *         1 if connection is lingering
- * may be called by listener thread
+ * return: 1 connection is alive (but aside and about to linger)
+ * May be called by listener thread.
  */
 static int start_lingering_close_nonblocking(event_conn_state_t *cs)
 {
-    conn_rec *c = cs->c;
-    apr_socket_t *csd = cs->pfd.desc.s;
-
-    if (ap_prep_lingering_close(c)
-        || c->aborted
-        || ap_shutdown_conn(c, 0) != APR_SUCCESS || c->aborted
-        || apr_socket_shutdown(csd, APR_SHUTDOWN_WRITE) != APR_SUCCESS) {
-        apr_socket_close(csd);
-        ap_push_pool(worker_queue_info, cs->p);
-        if (dying)
-            ap_queue_interrupt_one(worker_queue);
-        return 0;
+    event_conn_state_t *chain;
+    for (;;) {
+        /* Atomically exchange defer_linger_chain with cs which is
+         * the new head (i.e. chained to previous defer_linger_chain).
+         */
+        chain = cs->chain = defer_linger_chain;
+        if (apr_atomic_casptr((void *)&defer_linger_chain,
+                              cs, chain) == chain) {
+            break;
+        }
     }
-    return start_lingering_close_common(cs, 0);
+    return 1;
 }
 
 /*
@@ -837,6 +842,8 @@ static int start_lingering_close_nonblocking(event
  * expired
  * Pre-condition: cs is not in any timeout queue and not in the pollset
  * return: irrelevant (need same prototype as start_lingering_close)
+ * Note: since lingering, the socket timeout is set to zero, hence we are
+ * nonblocking here.
  */
 static int stop_lingering_close(event_conn_state_t *cs)
 {
@@ -1287,26 +1294,31 @@ static apr_status_t push_timer2worker(timer_event_
 }
 
 /*
- * Pre-condition: pfd->cs is neither in pollset nor timeout queue
+ * Pre-condition: cs is neither in event_pollset nor a timeout queue
  * this function may only be called by the listener
  */
-static apr_status_t push2worker(const apr_pollfd_t * pfd,
-                                apr_pollset_t * pollset)
+static apr_status_t push2worker(event_conn_state_t *cs)
 {
-    listener_poll_type *pt = (listener_poll_type *) pfd->client_data;
-    event_conn_state_t *cs = (event_conn_state_t *) pt->baton;
     apr_status_t rc;
 
-    rc = ap_queue_push(worker_queue, cs->pfd.desc.s, cs, cs->p);
+    if (cs) {
+        rc = ap_queue_push(worker_queue, cs->pfd.desc.s, cs, cs->p);
+    }
+    else {
+        rc = ap_queue_push(worker_queue, NULL, NULL, NULL);
+    }
     if (rc != APR_SUCCESS) {
-        /* trash the connection; we couldn't queue the connected
-         * socket to a worker
-         */
-        apr_bucket_alloc_destroy(cs->bucket_alloc);
-        apr_socket_close(cs->pfd.desc.s);
-        ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
-                     ap_server_conf, APLOGNO(00471) "push2worker: ap_queue_push failed");
-        ap_push_pool(worker_queue_info, cs->p);
+        if (cs) {
+            /* trash the connection; we couldn't queue the connected
+             * socket to a worker
+             */
+            apr_bucket_alloc_destroy(cs->bucket_alloc);
+            apr_socket_close(cs->pfd.desc.s);
+            ap_push_pool(worker_queue_info, cs->p);
+        }
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rc, ap_server_conf, APLOGNO(00471)
+                     "push2worker: ap_queue_push failed");
+        signal_threads(ST_GRACEFUL);
     }
 
     return rc;
@@ -1861,6 +1873,8 @@ static void * APR_THREAD_FUNC listener_thread(apr_
                     TO_QUEUE_REMOVE(remove_from_q, cs);
                     rc = apr_pollset_remove(event_pollset, &cs->pfd);
                     apr_thread_mutex_unlock(timeout_mutex);
+                    TO_QUEUE_ELEM_INIT(cs);
+
                     /*
                      * Some of the pollset backends, like KQueue or Epoll
                      * automagically remove the FD if the socket is closed,
@@ -1874,7 +1888,6 @@ static void * APR_THREAD_FUNC listener_thread(apr_
                         break;
                     }
 
-                    TO_QUEUE_ELEM_INIT(cs);
                     /* If we didn't get a worker immediately for a keep-alive
                      * request, we close the connection, so that the client can
                      * re-connect to a different process.
@@ -1881,22 +1894,17 @@ static void * APR_THREAD_FUNC listener_thread(apr_
                      */
                     if (!have_idle_worker) {
                         start_lingering_close_nonblocking(cs);
-                        break;
                     }
-                    rc = push2worker(out_pfd, event_pollset);
-                    if (rc != APR_SUCCESS) {
-                        ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
-                                     ap_server_conf, APLOGNO(03095)
-                                     "push2worker failed");
-                    }
-                    else {
+                    else if (push2worker(cs) == APR_SUCCESS) {
                         have_idle_worker = 0;
                     }
                     break;
+
                 case CONN_STATE_LINGER_NORMAL:
                 case CONN_STATE_LINGER_SHORT:
                     process_lingering_close(cs, out_pfd);
                     break;
+
                 default:
                     ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
                                  ap_server_conf, APLOGNO(03096)
@@ -2090,6 +2098,22 @@ static void * APR_THREAD_FUNC listener_thread(apr_
             ps->keep_alive = 0;
         }
 
+        /* If there are some lingering closes to defer (to a worker), schedule
+         * them now. We might wakeup a worker spuriously if another one empties
+         * defer_linger_chain in the meantime, but there also may be no active
+         * or all busy workers for an undefined time.  In any case a deferred
+         * lingering close can't starve if we do that here since the chain is
+         * filled only above in the listener and it's emptied only in the
+         * worker(s); thus a NULL here means it will stay so while the listener
+         * waits (possibly indefinitely) in poll().
+         */
+        if (defer_linger_chain) {
+            get_worker(&have_idle_worker, 0, &workers_were_busy);
+            if (have_idle_worker && push2worker(NULL) == APR_SUCCESS) {
+                have_idle_worker = 0;
+            }
+        }
+
         if (listeners_disabled && !workers_were_busy
             && ((c_count = apr_atomic_read32(&connection_count))
                     >= (l_count = apr_atomic_read32(&lingering_count))
@@ -2167,7 +2191,7 @@ static void *APR_THREAD_FUNC worker_thread(apr_thr
 
     while (!workers_may_exit) {
         apr_socket_t *csd = NULL;
-        event_conn_state_t *cs;
+        event_conn_state_t *cs = NULL;
         timer_event_t *te = NULL;
         apr_pool_t *ptrans;         /* Pool for per-transaction stuff */
 
@@ -2233,12 +2257,33 @@ static void *APR_THREAD_FUNC worker_thread(apr_thr
                 apr_thread_mutex_unlock(g_timer_skiplist_mtx);
             }
         }
-        else {
+        else if (ptrans != NULL) {
             is_idle = 0;
             worker_sockets[thread_slot] = csd;
             process_socket(thd, ptrans, csd, cs, process_slot, thread_slot);
             worker_sockets[thread_slot] = NULL;
         }
+
+        /* If there are deferred shutdowns, handle them now. */
+        for (;;) {
+            cs = defer_linger_chain;
+            if (!cs) {
+                break;
+            }
+
+            /* Atomically exchange defer_linger_chain with its chained
+             * entry (next), we can then shutdown the removed one (in cs).
+             */
+            if (apr_atomic_casptr((void *)&defer_linger_chain,
+                                  cs->chain, cs) == cs) {
+                cs->chain = NULL;
+                cs->pub.state = CONN_STATE_LINGER;
+                csd = ap_get_conn_socket(cs->c);
+                worker_sockets[thread_slot] = csd;
+                start_lingering_close_blocking(cs);
+                worker_sockets[thread_slot] = NULL;
+            }
+        }
     }
 
     ap_update_child_status_from_indexes(process_slot, thread_slot,

Reply via email to