Hi Stefan,

On Tue, Jan 24, 2017 at 1:37 PM, Stefan Eissing
<stefan.eiss...@greenbytes.de> wrote:
> Yann, thanks for the patch. I agree that the cleanups need to be killed in 
> the right place. Not certain if it was wrong before, but that part is not 
> easy to see for every combination.
>
> I did some rework and hope this makes it more readable. If you find the time 
> to look at it, feedback welcome.

I still fear that if beam->pool gets destroyed while both
beam_send_cleanup() and beam_cleanup() are registered, the former is
called twice.

I'd change:
    if (safe_send) {
        if (beam->send_pool && beam->send_pool != beam->pool) {
            apr_pool_cleanup_kill(beam->send_pool, beam, beam_send_cleanup);
        }
        status = beam_send_cleanup(beam);
    }

with:
    if (safe_send) {
        if (beam->send_pool) {
            if (beam->send_pool != beam->pool) {
                apr_pool_cleanup_kill(beam->send_pool, beam, beam_send_cleanup);
            }
            status = beam_send_cleanup(beam);
        }
    }

since in the above case beam_send_cleanup is run first and sets send_pool=NULL.

Attached v3 with this only change w.r.t. v2.
Otherwise, looks good to me, thanks!
Index: modules/http2/h2_bucket_beam.c
===================================================================
--- modules/http2/h2_bucket_beam.c      (revision 1780129)
+++ modules/http2/h2_bucket_beam.c      (working copy)
@@ -438,18 +438,37 @@ static apr_status_t beam_recv_cleanup(void *data)
     return APR_SUCCESS;
 }
 
+static int pool_register(h2_bucket_beam *beam, apr_pool_t *pool, 
+                         apr_status_t (*cleanup)(void *))
+{
+    if (pool && pool != beam->pool) {
+        apr_pool_pre_cleanup_register(pool, beam, cleanup);
+        return 1;
+    }
+    return 0;
+}
+
+static int pool_kill(h2_bucket_beam *beam, apr_pool_t *pool,
+                     apr_status_t (*cleanup)(void *)) {
+    if (pool && pool != beam->pool) {
+        apr_pool_cleanup_kill(pool, beam, cleanup);
+        return 1;
+    }
+    return 0;
+}
+
 static void beam_set_recv_pool(h2_bucket_beam *beam, apr_pool_t *pool) 
 {
-    /* if the beam owner is the sender, monitor receiver pool lifetime */ 
-    if (beam->owner == H2_BEAM_OWNER_SEND && beam->recv_pool != pool) {
-        if (beam->recv_pool) {
-            apr_pool_cleanup_kill(beam->recv_pool, beam, beam_recv_cleanup);
-        }
-        beam->recv_pool = pool;
-        if (beam->recv_pool) {
-            apr_pool_pre_cleanup_register(beam->recv_pool, beam, 
beam_recv_cleanup);
-        }
+    if (beam->recv_pool == pool || 
+        (beam->recv_pool && pool 
+         && apr_pool_is_ancestor(beam->recv_pool, pool))) {
+        /* when receiver same or sub-pool of existing, stick
+         * to the the pool we already have. */
+        return;
     }
+    pool_kill(beam, beam->recv_pool, beam_recv_cleanup);
+    beam->recv_pool = pool;
+    pool_register(beam, beam->recv_pool, beam_recv_cleanup);
 }
 
 static apr_status_t beam_send_cleanup(void *data)
@@ -473,22 +492,16 @@ static apr_status_t beam_send_cleanup(void *data)
 
 static void beam_set_send_pool(h2_bucket_beam *beam, apr_pool_t *pool) 
 {
-    /* if the beam owner is the receiver, monitor sender pool lifetime */
-    if (beam->owner == H2_BEAM_OWNER_RECV && beam->send_pool != pool) {
-        if (beam->send_pool && pool 
-            && apr_pool_is_ancestor(beam->send_pool, pool)) {
-            /* when sender uses sub-pools to transmit data, stick
-             * to the lifetime of the pool we already have. */
-             return;
-        }
-        if (beam->send_pool) {
-            apr_pool_cleanup_kill(beam->send_pool, beam, beam_send_cleanup);
-        }
-        beam->send_pool = pool;
-        if (beam->send_pool) {
-            apr_pool_pre_cleanup_register(beam->send_pool, beam, 
beam_send_cleanup);
-        }
+    if (beam->send_pool == pool || 
+        (beam->send_pool && pool 
+         && apr_pool_is_ancestor(beam->send_pool, pool))) {
+        /* when sender is same or sub-pool of existing, stick
+         * to the the pool we already have. */
+        return;
     }
+    pool_kill(beam, beam->send_pool, beam_send_cleanup);
+    beam->send_pool = pool;
+    pool_register(beam, beam->send_pool, beam_send_cleanup);
 }
 
 static apr_status_t beam_cleanup(void *data)
@@ -495,44 +508,57 @@ static apr_status_t beam_cleanup(void *data)
 {
     h2_bucket_beam *beam = data;
     apr_status_t status = APR_SUCCESS;
-    /* owner of the beam is going away, depending on its role, cleanup
-     * strategies differ. */
-    beam_close(beam);
-    switch (beam->owner) {
-        case H2_BEAM_OWNER_SEND:
-            status = beam_send_cleanup(beam);
-            beam->recv_buffer = NULL;
+    int safe_send = !beam->m_enter || (beam->owner == H2_BEAM_OWNER_SEND);
+    int safe_recv = !beam->m_enter || (beam->owner == H2_BEAM_OWNER_RECV);
+    
+    /* 
+     * Owner of the beam is going away, depending on which side it owns,
+     * cleanup strategies will differ with multi-thread protection
+     * still in place (beam->m_enter).
+     *
+     * In general, receiver holds references to memory from sender. 
+     * Clean up receiver first, if safe, then cleanup sender, if safe.
+     */
+     
+    /* When modify send is not safe, this means we still have multi-thread
+     * protection and the owner is receiving the buckets. If the sending
+     * side has not gone away, this means we could have dangling buckets
+     * in our lists that never get destroyed. This should not happen. */
+    ap_assert(safe_send || !beam->send_pool);
+    if (!H2_BLIST_EMPTY(&beam->send_list)) {
+        ap_assert(beam->send_pool);
+    }
+    
+    if (safe_recv) {
+        if (beam->recv_pool) {
+            apr_pool_cleanup_kill(beam->recv_pool, beam, beam_recv_cleanup);
             beam->recv_pool = NULL;
-            break;
-        case H2_BEAM_OWNER_RECV:
-            if (beam->recv_buffer) {
-                apr_brigade_destroy(beam->recv_buffer);
-            }
+        }
+        if (beam->recv_buffer) {
+            apr_brigade_destroy(beam->recv_buffer);
             beam->recv_buffer = NULL;
-            beam->recv_pool = NULL;
-            if (!H2_BLIST_EMPTY(&beam->send_list)) {
-                ap_assert(beam->send_pool);
-            }
-            if (beam->send_pool) {
-                /* sender has not cleaned up, its pool still lives.
-                 * this is normal if the sender uses cleanup via a bucket
-                 * such as the BUCKET_EOR for requests. In that case, the
-                 * beam should have lost its mutex protection, meaning
-                 * it is no longer used multi-threaded and we can safely
-                 * purge all remaining sender buckets. */
+        }
+    }
+    else {
+        beam->recv_buffer = NULL;
+        beam->recv_pool = NULL;
+    }
+    
+    if (safe_send) {
+        if (beam->send_pool) {
+            if (beam->send_pool != beam->pool) {
                 apr_pool_cleanup_kill(beam->send_pool, beam, 
beam_send_cleanup);
-                ap_assert(!beam->m_enter);
-                beam_send_cleanup(beam);
             }
-            ap_assert(H2_BPROXY_LIST_EMPTY(&beam->proxies));
-            ap_assert(H2_BLIST_EMPTY(&beam->send_list));
-            ap_assert(H2_BLIST_EMPTY(&beam->hold_list));
-            ap_assert(H2_BLIST_EMPTY(&beam->purge_list));
-            break;
-        default:
-            ap_assert(NULL);
-            break;
+            status = beam_send_cleanup(beam);
+        }
     }
+    
+    if (safe_recv) {
+        ap_assert(H2_BPROXY_LIST_EMPTY(&beam->proxies));
+        ap_assert(H2_BLIST_EMPTY(&beam->send_list));
+        ap_assert(H2_BLIST_EMPTY(&beam->hold_list));
+        ap_assert(H2_BLIST_EMPTY(&beam->purge_list));
+    }
     return status;
 }
 
Index: modules/proxy/mod_proxy.h
===================================================================
--- modules/proxy/mod_proxy.h   (revision 1780129)
+++ modules/proxy/mod_proxy.h   (working copy)
@@ -347,9 +347,9 @@ PROXY_WORKER_HC_FAIL )
 #define PROXY_WORKER_MAX_SCHEME_SIZE    16
 #define PROXY_WORKER_MAX_ROUTE_SIZE     64
 #define PROXY_BALANCER_MAX_ROUTE_SIZE PROXY_WORKER_MAX_ROUTE_SIZE
-#define PROXY_WORKER_MAX_NAME_SIZE      96
+#define PROXY_WORKER_MAX_NAME_SIZE      544
 #define PROXY_BALANCER_MAX_NAME_SIZE PROXY_WORKER_MAX_NAME_SIZE
-#define PROXY_WORKER_MAX_HOSTNAME_SIZE  64
+#define PROXY_WORKER_MAX_HOSTNAME_SIZE  512
 #define PROXY_BALANCER_MAX_HOSTNAME_SIZE PROXY_WORKER_MAX_HOSTNAME_SIZE
 #define PROXY_BALANCER_MAX_STICKY_SIZE  64
 
Index: modules/slotmem/mod_slotmem_shm.c
===================================================================
--- modules/slotmem/mod_slotmem_shm.c   (revision 1780129)
+++ modules/slotmem/mod_slotmem_shm.c   (working copy)
@@ -386,22 +386,26 @@ static apr_status_t slotmem_create(ap_slotmem_inst
         if (apr_shm_size_get(shm) != size) {
             apr_shm_detach(shm);
             ap_log_error(APLOG_MARK, APLOG_ERR, 0, ap_server_conf, 
APLOGNO(02599)
-                         "existing shared memory for %s could not be used 
(failed size check)",
+                         "existing shared memory for %s could not be reused 
(failed size check)",
                          fname);
-            return APR_EINVAL;
+            rv = APR_EINVAL;
         }
-        ptr = (char *)apr_shm_baseaddr_get(shm);
-        memcpy(&desc, ptr, sizeof(desc));
-        if (desc.size != item_size || desc.num != item_num) {
-            apr_shm_detach(shm);
-            ap_log_error(APLOG_MARK, APLOG_ERR, 0, ap_server_conf, 
APLOGNO(02600)
-                         "existing shared memory for %s could not be used 
(failed contents check)",
-                         fname);
-            return APR_EINVAL;
+        else {
+            ptr = (char *)apr_shm_baseaddr_get(shm);
+            memcpy(&desc, ptr, sizeof(desc));
+            if (desc.size != item_size || desc.num != item_num) {
+                apr_shm_detach(shm);
+                ap_log_error(APLOG_MARK, APLOG_ERR, 0, ap_server_conf, 
APLOGNO(02600)
+                             "existing shared memory for %s could not be 
reused (failed contents check)",
+                             fname);
+                rv = APR_EINVAL;
+            }
+            else {
+                ptr += AP_SLOTMEM_OFFSET;
+            }
         }
-        ptr += AP_SLOTMEM_OFFSET;
     }
-    else {
+    if (rv != APR_SUCCESS) {
         apr_size_t dsize = size - AP_SLOTMEM_OFFSET;
         if (fbased) {
             apr_shm_remove(fname, gpool);
Index: server/mpm/event/event.c
===================================================================
--- server/mpm/event/event.c    (revision 1780129)
+++ server/mpm/event/event.c    (working copy)
@@ -177,6 +177,7 @@ static int dying = 0;
 static int workers_may_exit = 0;
 static int start_thread_may_exit = 0;
 static int listener_may_exit = 0;
+static int listener_is_wakeable = 0;        /* Pollset supports 
APR_POLLSET_WAKEABLE */
 static int num_listensocks = 0;
 static apr_int32_t conns_this_child;        /* MaxConnectionsPerChild, only 
access
                                                in listener thread */
@@ -199,6 +200,19 @@ module AP_MODULE_DECLARE_DATA mpm_event_module;
 struct event_srv_cfg_s;
 typedef struct event_srv_cfg_s event_srv_cfg;
 
+static apr_pollfd_t *listener_pollfd;
+
+/*
+ * The pollset for sockets that are in any of the timeout queues. Currently
+ * we use the timeout_mutex to make sure that connections are added/removed
+ * atomically to/from both event_pollset and a timeout queue. Otherwise
+ * some confusion can happen under high load if timeout queues and pollset
+ * get out of sync.
+ * XXX: It should be possible to make the lock unnecessary in many or even all
+ * XXX: cases.
+ */
+static apr_pollset_t *event_pollset;
+
 struct event_conn_state_t {
     /** APR_RING of expiration timeouts */
     APR_RING_ENTRY(event_conn_state_t) timeout_list;
@@ -228,9 +242,10 @@ APR_RING_HEAD(timeout_head_t, event_conn_state_t);
 
 struct timeout_queue {
     struct timeout_head_t head;
-    int count, *total;
     apr_interval_time_t timeout;
-    struct timeout_queue *next;
+    apr_uint32_t count;         /* for this queue */
+    apr_uint32_t *total;        /* for all chained/related queues */
+    struct timeout_queue *next; /* chaining */
 };
 /*
  * Several timeout queues that use different timeouts, so that we always can
@@ -244,52 +259,65 @@ static struct timeout_queue *write_completion_q,
                             *keepalive_q,
                             *linger_q,
                             *short_linger_q;
+static volatile apr_time_t  queues_next_expiry;
 
-static apr_pollfd_t *listener_pollfd;
+/* Prevent extra poll/wakeup calls for timeouts close in the future (queues
+ * have the granularity of a second anyway).
+ * XXX: Wouldn't 0.5s (instead of 0.1s) be "enough"?
+ */
+#define TIMEOUT_FUDGE_FACTOR apr_time_from_msec(100)
 
 /*
  * Macros for accessing struct timeout_queue.
  * For TO_QUEUE_APPEND and TO_QUEUE_REMOVE, timeout_mutex must be held.
  */
-#define TO_QUEUE_APPEND(q, el)                                                \
-    do {                                                                      \
-        APR_RING_INSERT_TAIL(&(q)->head, el, event_conn_state_t,              \
-                             timeout_list);                                   \
-        ++*(q)->total;                                                        \
-        ++(q)->count;                                                         \
-    } while (0)
+static void TO_QUEUE_APPEND(struct timeout_queue *q, event_conn_state_t *el)
+{
+    apr_time_t q_expiry;
+    apr_time_t next_expiry;
 
-#define TO_QUEUE_REMOVE(q, el)                                                \
-    do {                                                                      \
-        APR_RING_REMOVE(el, timeout_list);                                    \
-        --*(q)->total;                                                        \
-        --(q)->count;                                                         \
-    } while (0)
+    APR_RING_INSERT_TAIL(&q->head, el, event_conn_state_t, timeout_list);
+    apr_atomic_inc32(q->total);
+    ++q->count;
 
-#define TO_QUEUE_INIT(q, p, t, v)                                             \
-    do {                                                                      \
-        struct timeout_queue *b = (v);                                        \
-        (q) = apr_palloc((p), sizeof *(q));                                   \
-        APR_RING_INIT(&(q)->head, event_conn_state_t, timeout_list);          \
-        (q)->total = (b) ? (b)->total : apr_pcalloc((p), sizeof *(q)->total); \
-        (q)->count = 0;                                                       \
-        (q)->timeout = (t);                                                   \
-        (q)->next = NULL;                                                     \
-    } while (0)
+    /* Cheaply update the overall queues' next expiry according to the
+     * first entry of this queue (oldest), if necessary.
+     */
+    el = APR_RING_FIRST(&q->head);
+    q_expiry = el->queue_timestamp + q->timeout;
+    next_expiry = queues_next_expiry;
+    if (!next_expiry || next_expiry > q_expiry + TIMEOUT_FUDGE_FACTOR) {
+        queues_next_expiry = q_expiry;
+        /* Unblock the poll()ing listener for it to update its timeout. */
+        if (listener_is_wakeable) {
+            apr_pollset_wakeup(event_pollset);
+        }
+    }
+}
 
-#define TO_QUEUE_ELEM_INIT(el) APR_RING_ELEM_INIT(el, timeout_list)
+static void TO_QUEUE_REMOVE(struct timeout_queue *q, event_conn_state_t *el)
+{
+    APR_RING_REMOVE(el, timeout_list);
+    apr_atomic_dec32(q->total);
+    --q->count;
+}
 
-/*
- * The pollset for sockets that are in any of the timeout queues. Currently
- * we use the timeout_mutex to make sure that connections are added/removed
- * atomically to/from both event_pollset and a timeout queue. Otherwise
- * some confusion can happen under high load if timeout queues and pollset
- * get out of sync.
- * XXX: It should be possible to make the lock unnecessary in many or even all
- * XXX: cases.
- */
-static apr_pollset_t *event_pollset;
+static struct timeout_queue *TO_QUEUE_MAKE(apr_pool_t *p, apr_time_t t,
+                                           struct timeout_queue *ref)
+{
+    struct timeout_queue *q;
+                                           
+    q = apr_pcalloc(p, sizeof *q);
+    APR_RING_INIT(&q->head, event_conn_state_t, timeout_list);
+    q->total = (ref) ? ref->total : apr_pcalloc(p, sizeof *q->total);
+    q->timeout = t;
 
+    return q;
+}
+
+#define TO_QUEUE_ELEM_INIT(el) \
+    APR_RING_ELEM_INIT((el), timeout_list)
+
 /* The structure used to pass unique initialization info to each thread */
 typedef struct
 {
@@ -474,6 +502,11 @@ static void wakeup_listener(void)
         return;
     }
 
+    /* Unblock the listener if it's poll()ing */
+    if (listener_is_wakeable) {
+        apr_pollset_wakeup(event_pollset);
+    }
+
     /* unblock the listener if it's waiting for a worker */
     ap_queue_info_term(worker_queue_info);
 
@@ -647,7 +680,11 @@ static apr_status_t decrement_connection_count(voi
         default:
             break;
     }
-    apr_atomic_dec32(&connection_count);
+    /* Unblock the listener if it's waiting for connection_count = 0 */
+    if (!apr_atomic_dec32(&connection_count)
+             && listener_is_wakeable && listener_may_exit) {
+        apr_pollset_wakeup(event_pollset);
+    }
     return APR_SUCCESS;
 }
 
@@ -810,6 +847,7 @@ static void notify_resume(event_conn_state_t *cs,
 
 static int start_lingering_close_common(event_conn_state_t *cs, int in_worker)
 {
+    int done = 0;
     apr_status_t rv;
     struct timeout_queue *q;
     apr_socket_t *csd = cs->pfd.desc.s;
@@ -842,25 +880,24 @@ static int start_lingering_close_common(event_conn
     else {
         cs->c->sbh = NULL;
     }
-    apr_thread_mutex_lock(timeout_mutex);
-    TO_QUEUE_APPEND(q, cs);
     cs->pfd.reqevents = (
             cs->pub.sense == CONN_SENSE_WANT_WRITE ? APR_POLLOUT :
                     APR_POLLIN) | APR_POLLHUP | APR_POLLERR;
     cs->pub.sense = CONN_SENSE_DEFAULT;
+    apr_thread_mutex_lock(timeout_mutex);
     rv = apr_pollset_add(event_pollset, &cs->pfd);
+    if (rv == APR_SUCCESS || APR_STATUS_IS_EEXIST(rv)) {
+        TO_QUEUE_APPEND(q, cs);
+        done = 1;
+    }
     apr_thread_mutex_unlock(timeout_mutex);
-    if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) {
+    if (!done) {
         ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03092)
                      "start_lingering_close: apr_pollset_add failure");
-        apr_thread_mutex_lock(timeout_mutex);
-        TO_QUEUE_REMOVE(q, cs);
-        apr_thread_mutex_unlock(timeout_mutex);
         apr_socket_close(cs->pfd.desc.s);
         ap_push_pool(worker_queue_info, cs->p);
-        return 0;
     }
-    return 1;
+    return done;
 }
 
 /*
@@ -1124,16 +1161,27 @@ read_request:
              * Set a write timeout for this connection, and let the
              * event thread poll for writeability.
              */
+            int done = 0;
             cs->queue_timestamp = apr_time_now();
             notify_suspend(cs);
-            apr_thread_mutex_lock(timeout_mutex);
-            TO_QUEUE_APPEND(cs->sc->wc_q, cs);
             cs->pfd.reqevents = (
                     cs->pub.sense == CONN_SENSE_WANT_READ ? APR_POLLIN :
                             APR_POLLOUT) | APR_POLLHUP | APR_POLLERR;
             cs->pub.sense = CONN_SENSE_DEFAULT;
+            apr_thread_mutex_lock(timeout_mutex);
             rc = apr_pollset_add(event_pollset, &cs->pfd);
+            if (rc == APR_SUCCESS || APR_STATUS_IS_EEXIST(rc)) {
+                TO_QUEUE_APPEND(cs->sc->wc_q, cs);
+                done = 1;
+            }
             apr_thread_mutex_unlock(timeout_mutex);
+            if (!done) {
+                ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf, 
APLOGNO(03465)
+                             "process_socket: apr_pollset_add failure for "
+                             "write completion");
+                apr_socket_close(cs->pfd.desc.s);
+                ap_push_pool(worker_queue_info, cs->p);
+            }
             return;
         }
         else if (c->keepalive != AP_CONN_KEEPALIVE || c->aborted ||
@@ -1161,20 +1209,26 @@ read_request:
          * timeout today.  With a normal client, the socket will be readable in
          * a few milliseconds anyway.
          */
+        int done = 0;
         cs->queue_timestamp = apr_time_now();
         notify_suspend(cs);
-        apr_thread_mutex_lock(timeout_mutex);
-        TO_QUEUE_APPEND(cs->sc->ka_q, cs);
 
         /* Add work to pollset. */
         cs->pfd.reqevents = APR_POLLIN;
+        apr_thread_mutex_lock(timeout_mutex);
         rc = apr_pollset_add(event_pollset, &cs->pfd);
+        if (rc == APR_SUCCESS || APR_STATUS_IS_EEXIST(rc)) {
+            TO_QUEUE_APPEND(cs->sc->ka_q, cs);
+            done = 1;
+        }
         apr_thread_mutex_unlock(timeout_mutex);
-
-        if (rc != APR_SUCCESS) {
+        if (!done) {
             ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf, 
APLOGNO(03093)
-                         "process_socket: apr_pollset_add failure");
-            AP_DEBUG_ASSERT(rc == APR_SUCCESS);
+                         "process_socket: apr_pollset_add failure for "
+                         "keep alive");
+            apr_socket_close(cs->pfd.desc.s);
+            ap_push_pool(worker_queue_info, cs->p);
+            return;
         }
     }
     else if (cs->pub.state == CONN_STATE_SUSPENDED) {
@@ -1345,7 +1399,14 @@ static void get_worker(int *have_idle_worker_p, in
 static APR_RING_HEAD(timer_free_ring_t, timer_event_t) timer_free_ring;
 
 static apr_skiplist *timer_skiplist;
+static volatile apr_time_t timers_next_expiry;
 
+/* Same goal as for TIMEOUT_FUDGE_FACTOR (avoid extra poll calls), but applied
+ * to timers. Since their timeouts are custom (user defined), we can't be too
+ * approximative here (hence using 0.01s).
+ */
+#define EVENT_FUDGE_FACTOR apr_time_from_msec(10)
+
 /* The following compare function is used by apr_skiplist_insert() to keep the
  * elements (timers) sorted and provide O(log n) complexity (this is also true
  * for apr_skiplist_{find,remove}(), but those are not used in MPM event where
@@ -1391,9 +1452,25 @@ static apr_status_t event_register_timed_callback(
     /* XXXXX: optimize */
     te->when = t + apr_time_now();
 
-    /* Okay, add sorted by when.. */
-    apr_skiplist_insert(timer_skiplist, te);
+    { 
+        apr_time_t next_expiry;
 
+        /* Okay, add sorted by when.. */
+        apr_skiplist_insert(timer_skiplist, te);
+
+        /* Cheaply update the overall timers' next expiry according to
+         * this event, if necessary.
+         */
+        next_expiry = timers_next_expiry;
+        if (!next_expiry || next_expiry > te->when + EVENT_FUDGE_FACTOR) {
+            timers_next_expiry = te->when;
+            /* Unblock the poll()ing listener for it to update its timeout. */
+            if (listener_is_wakeable) {
+                apr_pollset_wakeup(event_pollset);
+            }
+        }
+    }
+
     apr_thread_mutex_unlock(g_timer_skiplist_mtx);
 
     return APR_SUCCESS;
@@ -1427,15 +1504,14 @@ static void process_lingering_close(event_conn_sta
 
     apr_thread_mutex_lock(timeout_mutex);
     rv = apr_pollset_remove(event_pollset, pfd);
-    AP_DEBUG_ASSERT(rv == APR_SUCCESS);
+    TO_QUEUE_REMOVE(q, cs);
+    apr_thread_mutex_unlock(timeout_mutex);
+    AP_DEBUG_ASSERT(rv == APR_SUCCESS ||  APR_STATUS_IS_NOTFOUND(rv));
+    TO_QUEUE_ELEM_INIT(cs);
 
     rv = apr_socket_close(csd);
     AP_DEBUG_ASSERT(rv == APR_SUCCESS);
 
-    TO_QUEUE_REMOVE(q, cs);
-    apr_thread_mutex_unlock(timeout_mutex);
-    TO_QUEUE_ELEM_INIT(cs);
-
     ap_push_pool(worker_queue_info, cs->p);
     if (dying)
         ap_queue_interrupt_one(worker_queue);
@@ -1449,13 +1525,13 @@ static void process_timeout_queue(struct timeout_q
                                   apr_time_t timeout_time,
                                   int (*func)(event_conn_state_t *))
 {
-    int total = 0, count;
+    apr_uint32_t total = 0, count;
     event_conn_state_t *first, *cs, *last;
     struct timeout_head_t trash;
     struct timeout_queue *qp;
     apr_status_t rv;
 
-    if (!*q->total) {
+    if (!apr_atomic_read32(q->total)) {
         return;
     }
 
@@ -1464,20 +1540,32 @@ static void process_timeout_queue(struct timeout_q
         count = 0;
         cs = first = last = APR_RING_FIRST(&qp->head);
         while (cs != APR_RING_SENTINEL(&qp->head, event_conn_state_t,
-                                       timeout_list)
-               /* Trash the entry if:
-                * - no timeout_time was given (asked for all), or
-                * - it expired (according to the queue timeout), or
-                * - the system clock skewed in the past: no entry should be
-                *   registered above the given timeout_time (~now) + the queue
-                *   timeout, we won't keep any here (eg. for centuries).
-                * Stop otherwise, no following entry will match thanks to the
-                * single timeout per queue (entries are added to the end!).
-                * This allows maintenance in O(1).
-                */
-               && (!timeout_time
-                   || cs->queue_timestamp + qp->timeout < timeout_time
-                   || cs->queue_timestamp > timeout_time + qp->timeout)) {
+                                       timeout_list)) {
+            /* Trash the entry if:
+             * - no timeout_time was given (asked for all), or
+             * - it expired (according to the queue timeout), or
+             * - the system clock skewed in the past: no entry should be
+             *   registered above the given timeout_time (~now) + the queue
+             *   timeout, we won't keep any here (eg. for centuries).
+             *
+             * Otherwise stop, no following entry will match thanks to the
+             * single timeout per queue (entries are added to the end!).
+             * This allows maintenance in O(1).
+             */
+            if (timeout_time
+                    && cs->queue_timestamp + qp->timeout > timeout_time
+                    && cs->queue_timestamp < timeout_time + qp->timeout) {
+                /* Since this is the next expiring of this queue, update the
+                 * overall queues' next expiry if it's later than this one.
+                 */
+                apr_time_t q_expiry = cs->queue_timestamp + qp->timeout;
+                apr_time_t next_expiry = queues_next_expiry;
+                if (!next_expiry || next_expiry > q_expiry) {
+                    queues_next_expiry = q_expiry;
+                }
+                break;
+            }
+
             last = cs;
             rv = apr_pollset_remove(event_pollset, &cs->pfd);
             if (rv != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rv)) {
@@ -1493,6 +1581,8 @@ static void process_timeout_queue(struct timeout_q
         APR_RING_UNSPLICE(first, last, timeout_list);
         APR_RING_SPLICE_TAIL(&trash, first, last, event_conn_state_t,
                              timeout_list);
+        AP_DEBUG_ASSERT(apr_atomic_read32(q->total) >= count);
+        apr_atomic_sub32(q->total, count);
         qp->count -= count;
         total += count;
     }
@@ -1499,8 +1589,6 @@ static void process_timeout_queue(struct timeout_q
     if (!total)
         return;
 
-    AP_DEBUG_ASSERT(*q->total >= total);
-    *q->total -= total;
     apr_thread_mutex_unlock(timeout_mutex);
     first = APR_RING_FIRST(&trash);
     do {
@@ -1512,13 +1600,28 @@ static void process_timeout_queue(struct timeout_q
     apr_thread_mutex_lock(timeout_mutex);
 }
 
+static void process_keepalive_queue(apr_time_t timeout_time)
+{
+    /* If all workers are busy, we kill older keep-alive connections so
+     * that they may connect to another process.
+     */
+    if (!timeout_time) {
+        ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
+                     "All workers are busy or dying, will close %u "
+                     "keep-alive connections",
+                     apr_atomic_read32(keepalive_q->total));
+    }
+    process_timeout_queue(keepalive_q, timeout_time,
+                          start_lingering_close_nonblocking);
+}
+
 static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
 {
-    timer_event_t *ep;
     timer_event_t *te;
     apr_status_t rc;
     proc_info *ti = dummy;
     int process_slot = ti->pslot;
+    struct process_score *ps = ap_get_scoreboard_process(process_slot);
     apr_pool_t *tpool = apr_thread_pool_get(thd);
     void *csd = NULL;
     apr_pool_t *ptrans;         /* Pool for per-transaction stuff */
@@ -1534,14 +1637,6 @@ static void * APR_THREAD_FUNC listener_thread(apr_
     last_log = apr_time_now();
     free(ti);
 
-    /* the following times out events that are really close in the future
-     *   to prevent extra poll calls
-     *
-     * current value is .1 second
-     */
-#define TIMEOUT_FUDGE_FACTOR 100000
-#define EVENT_FUDGE_FACTOR 10000
-
     rc = init_pollset(tpool);
     if (rc != APR_SUCCESS) {
         ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf,
@@ -1559,6 +1654,7 @@ static void * APR_THREAD_FUNC listener_thread(apr_
 
     for (;;) {
         int workers_were_busy = 0;
+
         if (listener_may_exit) {
             close_listeners(process_slot, &closed);
             if (terminate_mode == ST_UNGRACEFUL
@@ -1572,7 +1668,7 @@ static void * APR_THREAD_FUNC listener_thread(apr_
         now = apr_time_now();
         if (APLOGtrace6(ap_server_conf)) {
             /* trace log status every second */
-            if (now - last_log > apr_time_from_msec(1000)) {
+            if (now - last_log > apr_time_from_sec(1)) {
                 last_log = now;
                 apr_thread_mutex_lock(timeout_mutex);
                 ap_log_error(APLOG_MARK, APLOG_TRACE6, 0, ap_server_conf,
@@ -1580,8 +1676,8 @@ static void * APR_THREAD_FUNC listener_thread(apr_
                              "keep-alive: %d lingering: %d suspended: %u)",
                              apr_atomic_read32(&connection_count),
                              apr_atomic_read32(&clogged_count),
-                             *write_completion_q->total,
-                             *keepalive_q->total,
+                             apr_atomic_read32(write_completion_q->total),
+                             apr_atomic_read32(keepalive_q->total),
                              apr_atomic_read32(&lingering_count),
                              apr_atomic_read32(&suspended_count));
                 if (dying) {
@@ -1594,32 +1690,71 @@ static void * APR_THREAD_FUNC listener_thread(apr_
             }
         }
 
-        apr_thread_mutex_lock(g_timer_skiplist_mtx);
-        te = apr_skiplist_peek(timer_skiplist);
-        if (te) {
-            if (te->when > now) {
-                timeout_interval = te->when - now;
+        /* Start with an infinite poll() timeout and update it according to
+         * the next expiring timer or queue entry. If there are none, either
+         * the listener is wakeable and it can poll() indefinitely until a wake
+         * up occurs, otherwise periodic checks (maintenance, shutdown, ...)
+         * must be performed.
+         */
+        timeout_interval = -1;
+
+        /* Push expired timers to a worker, the first remaining one determines
+         * the maximum time to poll() below, if any.
+         */
+        timeout_time = timers_next_expiry;
+        if (timeout_time && timeout_time < now + EVENT_FUDGE_FACTOR) {
+            apr_thread_mutex_lock(g_timer_skiplist_mtx);
+            while ((te = apr_skiplist_peek(timer_skiplist))) {
+                if (te->when > now + EVENT_FUDGE_FACTOR) {
+                    timers_next_expiry = te->when;
+                    timeout_interval = te->when - now;
+                    break;
+                }
+                apr_skiplist_pop(timer_skiplist, NULL);
+                push_timer2worker(te);
             }
-            else {
-                timeout_interval = 1;
+            if (!te) {
+                timers_next_expiry = 0;
             }
+            apr_thread_mutex_unlock(g_timer_skiplist_mtx);
         }
-        else {
-            timeout_interval = apr_time_from_msec(100);
+
+        /* Same for queues, use their next expiry, if any. */
+        timeout_time = queues_next_expiry;
+        if (timeout_time
+                && (timeout_interval < 0
+                    || timeout_time <= now
+                    || timeout_interval > timeout_time - now)) {
+            timeout_interval = timeout_time > now ? timeout_time - now : 1;
         }
-        apr_thread_mutex_unlock(g_timer_skiplist_mtx);
 
+        /* When non-wakeable, don't wait more than 100 ms, in any case. */
+#define NON_WAKEABLE_POLL_TIMEOUT apr_time_from_msec(100)
+        if (!listener_is_wakeable
+                && (timeout_interval < 0
+                    || timeout_interval > NON_WAKEABLE_POLL_TIMEOUT)) {
+            timeout_interval = NON_WAKEABLE_POLL_TIMEOUT;
+        }
+
         rc = apr_pollset_poll(event_pollset, timeout_interval, &num, &out_pfd);
         if (rc != APR_SUCCESS) {
             if (APR_STATUS_IS_EINTR(rc)) {
-                continue;
+                /* Woken up, if we are exiting we must fall through to kill
+                 * kept-alive connections, otherwise we only need to update
+                 * timeouts (logic is above, so restart the loop).
+                 */
+                if (!listener_may_exit) {
+                    continue;
+                }
+                timeout_time = 0;
             }
-            if (!APR_STATUS_IS_TIMEUP(rc)) {
+            else if (!APR_STATUS_IS_TIMEUP(rc)) {
                 ap_log_error(APLOG_MARK, APLOG_CRIT, rc, ap_server_conf,
                              "apr_pollset_poll failed.  Attempting to "
                              "shutdown process gracefully");
                 signal_threads(ST_GRACEFUL);
             }
+            num = 0;
         }
 
         if (listener_may_exit) {
@@ -1629,21 +1764,6 @@ static void * APR_THREAD_FUNC listener_thread(apr_
                 break;
         }
 
-        now = apr_time_now();
-        apr_thread_mutex_lock(g_timer_skiplist_mtx);
-        ep = apr_skiplist_peek(timer_skiplist);
-        while (ep) {
-            if (ep->when < now + EVENT_FUDGE_FACTOR) {
-                apr_skiplist_pop(timer_skiplist, NULL);
-                push_timer2worker(ep);
-            }
-            else {
-                break;
-            }
-            ep = apr_skiplist_peek(timer_skiplist);
-        }
-        apr_thread_mutex_unlock(g_timer_skiplist_mtx);
-
         while (num) {
             pt = (listener_poll_type *) out_pfd->client_data;
             if (pt->type == PT_CSD) {
@@ -1666,7 +1786,6 @@ static void * APR_THREAD_FUNC listener_thread(apr_
                     TO_QUEUE_REMOVE(remove_from_q, cs);
                     rc = apr_pollset_remove(event_pollset, &cs->pfd);
                     apr_thread_mutex_unlock(timeout_mutex);
-
                     /*
                      * Some of the pollset backends, like KQueue or Epoll
                      * automagically remove the FD if the socket is closed,
@@ -1808,52 +1927,55 @@ static void * APR_THREAD_FUNC listener_thread(apr_
         /* XXX possible optimization: stash the current time for use as
          * r->request_time for new requests
          */
-        now = apr_time_now();
-        /* We only do this once per 0.1s (TIMEOUT_FUDGE_FACTOR), or on a clock
-         * skew (if the system time is set back in the meantime, timeout_time
-         * will exceed now + TIMEOUT_FUDGE_FACTOR, can't happen otherwise).
+        /* We process the timeout queues here only when their overall next
+         * expiry (read once above) is over. This happens accurately since
+         * adding to the queues (in workers) can only decrease this expiry,
+         * while latest ones are only taken into account here (in listener)
+         * during queues' processing, with the lock held. This works both
+         * with and without wake-ability.
          */
-        if (now > timeout_time || now + TIMEOUT_FUDGE_FACTOR < timeout_time ) {
-            struct process_score *ps;
+        if (timeout_time && timeout_time < (now = apr_time_now())) {
             timeout_time = now + TIMEOUT_FUDGE_FACTOR;
 
             /* handle timed out sockets */
             apr_thread_mutex_lock(timeout_mutex);
 
+            /* Processing all the queues below will recompute this. */
+            queues_next_expiry = 0;
+
             /* Step 1: keepalive timeouts */
-            /* If all workers are busy, we kill older keep-alive connections 
so that they
-             * may connect to another process.
-             */
-            if ((workers_were_busy || dying) && *keepalive_q->total) {
-                if (!dying)
-                    ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
-                                 "All workers are busy, will close %d 
keep-alive "
-                                 "connections",
-                                 *keepalive_q->total);
-                process_timeout_queue(keepalive_q, 0,
-                                      start_lingering_close_nonblocking);
+            if (workers_were_busy || dying) {
+                process_keepalive_queue(0); /* kill'em all \m/ */
             }
             else {
-                process_timeout_queue(keepalive_q, timeout_time,
-                                      start_lingering_close_nonblocking);
+                process_keepalive_queue(timeout_time);
             }
             /* Step 2: write completion timeouts */
             process_timeout_queue(write_completion_q, timeout_time,
                                   start_lingering_close_nonblocking);
             /* Step 3: (normal) lingering close completion timeouts */
-            process_timeout_queue(linger_q, timeout_time, 
stop_lingering_close);
+            process_timeout_queue(linger_q, timeout_time,
+                                  stop_lingering_close);
             /* Step 4: (short) lingering close completion timeouts */
-            process_timeout_queue(short_linger_q, timeout_time, 
stop_lingering_close);
+            process_timeout_queue(short_linger_q, timeout_time,
+                                  stop_lingering_close);
 
-            ps = ap_get_scoreboard_process(process_slot);
-            ps->write_completion = *write_completion_q->total;
-            ps->keep_alive = *keepalive_q->total;
             apr_thread_mutex_unlock(timeout_mutex);
 
+            ps->keep_alive = apr_atomic_read32(keepalive_q->total);
+            ps->write_completion = 
apr_atomic_read32(write_completion_q->total);
             ps->connections = apr_atomic_read32(&connection_count);
             ps->suspended = apr_atomic_read32(&suspended_count);
             ps->lingering_close = apr_atomic_read32(&lingering_count);
         }
+        else if ((workers_were_busy || dying)
+                 && apr_atomic_read32(keepalive_q->total)) {
+            apr_thread_mutex_lock(timeout_mutex);
+            process_keepalive_queue(0); /* kill'em all \m/ */
+            apr_thread_mutex_unlock(timeout_mutex);
+            ps->keep_alive = apr_atomic_read32(keepalive_q->total);
+        }
+
         if (listeners_disabled && !workers_were_busy
             && (int)apr_atomic_read32(&connection_count)
                - (int)apr_atomic_read32(&lingering_count)
@@ -2064,6 +2186,8 @@ static void *APR_THREAD_FUNC start_threads(apr_thr
     int prev_threads_created;
     int max_recycled_pools = -1;
     int good_methods[] = {APR_POLLSET_KQUEUE, APR_POLLSET_PORT, 
APR_POLLSET_EPOLL};
+    /* XXX don't we need more to handle K-A or lingering close? */
+    const apr_uint32_t pollset_size = threads_per_child * 2;
 
     /* We must create the fd queues before we start up the listener
      * and worker threads. */
@@ -2103,24 +2227,24 @@ static void *APR_THREAD_FUNC start_threads(apr_thr
 
     /* Create the main pollset */
     for (i = 0; i < sizeof(good_methods) / sizeof(good_methods[0]); i++) {
-        rv = apr_pollset_create_ex(&event_pollset,
-                            threads_per_child*2, /* XXX don't we need more, to 
handle
-                                                * connections in K-A or 
lingering
-                                                * close?
-                                                */
-                            pchild, APR_POLLSET_THREADSAFE | 
APR_POLLSET_NOCOPY | APR_POLLSET_NODEFAULT,
-                            good_methods[i]);
+        apr_uint32_t flags = APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY |
+                             APR_POLLSET_NODEFAULT | APR_POLLSET_WAKEABLE;
+        rv = apr_pollset_create_ex(&event_pollset, pollset_size, pchild, flags,
+                                   good_methods[i]);
         if (rv == APR_SUCCESS) {
+            listener_is_wakeable = 1;
             break;
         }
+        flags &= ~APR_POLLSET_WAKEABLE;
+        rv = apr_pollset_create_ex(&event_pollset, pollset_size, pchild, flags,
+                                   good_methods[i]);
+        if (rv == APR_SUCCESS) {
+            break;
+        }
     }
     if (rv != APR_SUCCESS) {
-        rv = apr_pollset_create(&event_pollset,
-                               threads_per_child*2, /* XXX don't we need more, 
to handle
-                                                     * connections in K-A or 
lingering
-                                                     * close?
-                                                     */
-                               pchild, APR_POLLSET_THREADSAFE | 
APR_POLLSET_NOCOPY);
+        rv = apr_pollset_create(&event_pollset, pollset_size, pchild,
+                                APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY);
     }
     if (rv != APR_SUCCESS) {
         ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03103)
@@ -2129,7 +2253,9 @@ static void *APR_THREAD_FUNC start_threads(apr_thr
     }
 
     ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(02471)
-                 "start_threads: Using %s", 
apr_pollset_method_name(event_pollset));
+                 "start_threads: Using %s (%swakeable)",
+                 apr_pollset_method_name(event_pollset),
+                 listener_is_wakeable ? "" : "not ");
     worker_sockets = apr_pcalloc(pchild, threads_per_child
                                  * sizeof(apr_socket_t *));
 
@@ -3262,10 +3388,10 @@ static int event_post_config(apr_pool_t *pconf, ap
     wc.hash = apr_hash_make(ptemp);
     ka.hash = apr_hash_make(ptemp);
 
-    TO_QUEUE_INIT(linger_q, pconf,
-                  apr_time_from_sec(MAX_SECS_TO_LINGER), NULL);
-    TO_QUEUE_INIT(short_linger_q, pconf,
-                  apr_time_from_sec(SECONDS_TO_LINGER), NULL);
+    linger_q = TO_QUEUE_MAKE(pconf, apr_time_from_sec(MAX_SECS_TO_LINGER),
+                             NULL);
+    short_linger_q = TO_QUEUE_MAKE(pconf, apr_time_from_sec(SECONDS_TO_LINGER),
+                                   NULL);
 
     for (; s; s = s->next) {
         event_srv_cfg *sc = apr_pcalloc(pconf, sizeof *sc);
@@ -3273,11 +3399,11 @@ static int event_post_config(apr_pool_t *pconf, ap
         ap_set_module_config(s->module_config, &mpm_event_module, sc);
         if (!wc.tail) {
             /* The main server uses the global queues */
-            TO_QUEUE_INIT(wc.q, pconf, s->timeout, NULL);
+            wc.q = TO_QUEUE_MAKE(pconf, s->timeout, NULL);
             apr_hash_set(wc.hash, &s->timeout, sizeof s->timeout, wc.q);
             wc.tail = write_completion_q = wc.q;
 
-            TO_QUEUE_INIT(ka.q, pconf, s->keep_alive_timeout, NULL);
+            ka.q = TO_QUEUE_MAKE(pconf, s->keep_alive_timeout, NULL);
             apr_hash_set(ka.hash, &s->keep_alive_timeout,
                          sizeof s->keep_alive_timeout, ka.q);
             ka.tail = keepalive_q = ka.q;
@@ -3287,7 +3413,7 @@ static int event_post_config(apr_pool_t *pconf, ap
              * or their own queue(s) if there isn't */
             wc.q = apr_hash_get(wc.hash, &s->timeout, sizeof s->timeout);
             if (!wc.q) {
-                TO_QUEUE_INIT(wc.q, pconf, s->timeout, wc.tail);
+                wc.q = TO_QUEUE_MAKE(pconf, s->timeout, wc.tail);
                 apr_hash_set(wc.hash, &s->timeout, sizeof s->timeout, wc.q);
                 wc.tail = wc.tail->next = wc.q;
             }
@@ -3295,7 +3421,7 @@ static int event_post_config(apr_pool_t *pconf, ap
             ka.q = apr_hash_get(ka.hash, &s->keep_alive_timeout,
                                 sizeof s->keep_alive_timeout);
             if (!ka.q) {
-                TO_QUEUE_INIT(ka.q, pconf, s->keep_alive_timeout, ka.tail);
+                ka.q = TO_QUEUE_MAKE(pconf, s->keep_alive_timeout, ka.tail);
                 apr_hash_set(ka.hash, &s->keep_alive_timeout,
                              sizeof s->keep_alive_timeout, ka.q);
                 ka.tail = ka.tail->next = ka.q;
Index: server/mpm/event/fdqueue.c
===================================================================
--- server/mpm/event/fdqueue.c  (revision 1780129)
+++ server/mpm/event/fdqueue.c  (working copy)
@@ -269,9 +269,8 @@ void ap_pop_pool(apr_pool_t ** recycled_pool, fd_q
         if (first_pool == NULL) {
             break;
         }
-        if (apr_atomic_casptr
-            ((void*) &(queue_info->recycled_pools),
-             first_pool->next, first_pool) == first_pool) {
+        if (apr_atomic_casptr((void*) &(queue_info->recycled_pools),
+                              first_pool->next, first_pool) == first_pool) {
             *recycled_pool = first_pool->pool;
             if (queue_info->max_recycled_pools >= 0)
                 apr_atomic_dec32(&queue_info->recycled_pools_count);

Reply via email to