Hi Stefan, On Tue, Jan 24, 2017 at 1:37 PM, Stefan Eissing <stefan.eiss...@greenbytes.de> wrote: > Yann, thanks for the patch. I agree that the cleanups need to be killed in > the right place. Not certain if it was wrong before, but that part is not > easy to see for every combination. > > I did some rework and hope this makes it more readable. If you find the time > to look at it, feedback welcome.
I still fear that if beam->pool gets destroyed while both beam_send_cleanup() and beam_cleanup() are registered, the former is called twice. I'd change: if (safe_send) { if (beam->send_pool && beam->send_pool != beam->pool) { apr_pool_cleanup_kill(beam->send_pool, beam, beam_send_cleanup); } status = beam_send_cleanup(beam); } with: if (safe_send) { if (beam->send_pool) { if (beam->send_pool != beam->pool) { apr_pool_cleanup_kill(beam->send_pool, beam, beam_send_cleanup); } status = beam_send_cleanup(beam); } } since in the above case beam_send_cleanup is run first and sets send_pool=NULL. Attached v3 with this only change w.r.t. v2. Otherwise, looks good to me, thanks!
Index: modules/http2/h2_bucket_beam.c =================================================================== --- modules/http2/h2_bucket_beam.c (revision 1780129) +++ modules/http2/h2_bucket_beam.c (working copy) @@ -438,18 +438,37 @@ static apr_status_t beam_recv_cleanup(void *data) return APR_SUCCESS; } +static int pool_register(h2_bucket_beam *beam, apr_pool_t *pool, + apr_status_t (*cleanup)(void *)) +{ + if (pool && pool != beam->pool) { + apr_pool_pre_cleanup_register(pool, beam, cleanup); + return 1; + } + return 0; +} + +static int pool_kill(h2_bucket_beam *beam, apr_pool_t *pool, + apr_status_t (*cleanup)(void *)) { + if (pool && pool != beam->pool) { + apr_pool_cleanup_kill(pool, beam, cleanup); + return 1; + } + return 0; +} + static void beam_set_recv_pool(h2_bucket_beam *beam, apr_pool_t *pool) { - /* if the beam owner is the sender, monitor receiver pool lifetime */ - if (beam->owner == H2_BEAM_OWNER_SEND && beam->recv_pool != pool) { - if (beam->recv_pool) { - apr_pool_cleanup_kill(beam->recv_pool, beam, beam_recv_cleanup); - } - beam->recv_pool = pool; - if (beam->recv_pool) { - apr_pool_pre_cleanup_register(beam->recv_pool, beam, beam_recv_cleanup); - } + if (beam->recv_pool == pool || + (beam->recv_pool && pool + && apr_pool_is_ancestor(beam->recv_pool, pool))) { + /* when receiver same or sub-pool of existing, stick + * to the the pool we already have. */ + return; } + pool_kill(beam, beam->recv_pool, beam_recv_cleanup); + beam->recv_pool = pool; + pool_register(beam, beam->recv_pool, beam_recv_cleanup); } static apr_status_t beam_send_cleanup(void *data) @@ -473,22 +492,16 @@ static apr_status_t beam_send_cleanup(void *data) static void beam_set_send_pool(h2_bucket_beam *beam, apr_pool_t *pool) { - /* if the beam owner is the receiver, monitor sender pool lifetime */ - if (beam->owner == H2_BEAM_OWNER_RECV && beam->send_pool != pool) { - if (beam->send_pool && pool - && apr_pool_is_ancestor(beam->send_pool, pool)) { - /* when sender uses sub-pools to transmit data, stick - * to the lifetime of the pool we already have. */ - return; - } - if (beam->send_pool) { - apr_pool_cleanup_kill(beam->send_pool, beam, beam_send_cleanup); - } - beam->send_pool = pool; - if (beam->send_pool) { - apr_pool_pre_cleanup_register(beam->send_pool, beam, beam_send_cleanup); - } + if (beam->send_pool == pool || + (beam->send_pool && pool + && apr_pool_is_ancestor(beam->send_pool, pool))) { + /* when sender is same or sub-pool of existing, stick + * to the the pool we already have. */ + return; } + pool_kill(beam, beam->send_pool, beam_send_cleanup); + beam->send_pool = pool; + pool_register(beam, beam->send_pool, beam_send_cleanup); } static apr_status_t beam_cleanup(void *data) @@ -495,44 +508,57 @@ static apr_status_t beam_cleanup(void *data) { h2_bucket_beam *beam = data; apr_status_t status = APR_SUCCESS; - /* owner of the beam is going away, depending on its role, cleanup - * strategies differ. */ - beam_close(beam); - switch (beam->owner) { - case H2_BEAM_OWNER_SEND: - status = beam_send_cleanup(beam); - beam->recv_buffer = NULL; + int safe_send = !beam->m_enter || (beam->owner == H2_BEAM_OWNER_SEND); + int safe_recv = !beam->m_enter || (beam->owner == H2_BEAM_OWNER_RECV); + + /* + * Owner of the beam is going away, depending on which side it owns, + * cleanup strategies will differ with multi-thread protection + * still in place (beam->m_enter). + * + * In general, receiver holds references to memory from sender. + * Clean up receiver first, if safe, then cleanup sender, if safe. + */ + + /* When modify send is not safe, this means we still have multi-thread + * protection and the owner is receiving the buckets. If the sending + * side has not gone away, this means we could have dangling buckets + * in our lists that never get destroyed. This should not happen. */ + ap_assert(safe_send || !beam->send_pool); + if (!H2_BLIST_EMPTY(&beam->send_list)) { + ap_assert(beam->send_pool); + } + + if (safe_recv) { + if (beam->recv_pool) { + apr_pool_cleanup_kill(beam->recv_pool, beam, beam_recv_cleanup); beam->recv_pool = NULL; - break; - case H2_BEAM_OWNER_RECV: - if (beam->recv_buffer) { - apr_brigade_destroy(beam->recv_buffer); - } + } + if (beam->recv_buffer) { + apr_brigade_destroy(beam->recv_buffer); beam->recv_buffer = NULL; - beam->recv_pool = NULL; - if (!H2_BLIST_EMPTY(&beam->send_list)) { - ap_assert(beam->send_pool); - } - if (beam->send_pool) { - /* sender has not cleaned up, its pool still lives. - * this is normal if the sender uses cleanup via a bucket - * such as the BUCKET_EOR for requests. In that case, the - * beam should have lost its mutex protection, meaning - * it is no longer used multi-threaded and we can safely - * purge all remaining sender buckets. */ + } + } + else { + beam->recv_buffer = NULL; + beam->recv_pool = NULL; + } + + if (safe_send) { + if (beam->send_pool) { + if (beam->send_pool != beam->pool) { apr_pool_cleanup_kill(beam->send_pool, beam, beam_send_cleanup); - ap_assert(!beam->m_enter); - beam_send_cleanup(beam); } - ap_assert(H2_BPROXY_LIST_EMPTY(&beam->proxies)); - ap_assert(H2_BLIST_EMPTY(&beam->send_list)); - ap_assert(H2_BLIST_EMPTY(&beam->hold_list)); - ap_assert(H2_BLIST_EMPTY(&beam->purge_list)); - break; - default: - ap_assert(NULL); - break; + status = beam_send_cleanup(beam); + } } + + if (safe_recv) { + ap_assert(H2_BPROXY_LIST_EMPTY(&beam->proxies)); + ap_assert(H2_BLIST_EMPTY(&beam->send_list)); + ap_assert(H2_BLIST_EMPTY(&beam->hold_list)); + ap_assert(H2_BLIST_EMPTY(&beam->purge_list)); + } return status; } Index: modules/proxy/mod_proxy.h =================================================================== --- modules/proxy/mod_proxy.h (revision 1780129) +++ modules/proxy/mod_proxy.h (working copy) @@ -347,9 +347,9 @@ PROXY_WORKER_HC_FAIL ) #define PROXY_WORKER_MAX_SCHEME_SIZE 16 #define PROXY_WORKER_MAX_ROUTE_SIZE 64 #define PROXY_BALANCER_MAX_ROUTE_SIZE PROXY_WORKER_MAX_ROUTE_SIZE -#define PROXY_WORKER_MAX_NAME_SIZE 96 +#define PROXY_WORKER_MAX_NAME_SIZE 544 #define PROXY_BALANCER_MAX_NAME_SIZE PROXY_WORKER_MAX_NAME_SIZE -#define PROXY_WORKER_MAX_HOSTNAME_SIZE 64 +#define PROXY_WORKER_MAX_HOSTNAME_SIZE 512 #define PROXY_BALANCER_MAX_HOSTNAME_SIZE PROXY_WORKER_MAX_HOSTNAME_SIZE #define PROXY_BALANCER_MAX_STICKY_SIZE 64 Index: modules/slotmem/mod_slotmem_shm.c =================================================================== --- modules/slotmem/mod_slotmem_shm.c (revision 1780129) +++ modules/slotmem/mod_slotmem_shm.c (working copy) @@ -386,22 +386,26 @@ static apr_status_t slotmem_create(ap_slotmem_inst if (apr_shm_size_get(shm) != size) { apr_shm_detach(shm); ap_log_error(APLOG_MARK, APLOG_ERR, 0, ap_server_conf, APLOGNO(02599) - "existing shared memory for %s could not be used (failed size check)", + "existing shared memory for %s could not be reused (failed size check)", fname); - return APR_EINVAL; + rv = APR_EINVAL; } - ptr = (char *)apr_shm_baseaddr_get(shm); - memcpy(&desc, ptr, sizeof(desc)); - if (desc.size != item_size || desc.num != item_num) { - apr_shm_detach(shm); - ap_log_error(APLOG_MARK, APLOG_ERR, 0, ap_server_conf, APLOGNO(02600) - "existing shared memory for %s could not be used (failed contents check)", - fname); - return APR_EINVAL; + else { + ptr = (char *)apr_shm_baseaddr_get(shm); + memcpy(&desc, ptr, sizeof(desc)); + if (desc.size != item_size || desc.num != item_num) { + apr_shm_detach(shm); + ap_log_error(APLOG_MARK, APLOG_ERR, 0, ap_server_conf, APLOGNO(02600) + "existing shared memory for %s could not be reused (failed contents check)", + fname); + rv = APR_EINVAL; + } + else { + ptr += AP_SLOTMEM_OFFSET; + } } - ptr += AP_SLOTMEM_OFFSET; } - else { + if (rv != APR_SUCCESS) { apr_size_t dsize = size - AP_SLOTMEM_OFFSET; if (fbased) { apr_shm_remove(fname, gpool); Index: server/mpm/event/event.c =================================================================== --- server/mpm/event/event.c (revision 1780129) +++ server/mpm/event/event.c (working copy) @@ -177,6 +177,7 @@ static int dying = 0; static int workers_may_exit = 0; static int start_thread_may_exit = 0; static int listener_may_exit = 0; +static int listener_is_wakeable = 0; /* Pollset supports APR_POLLSET_WAKEABLE */ static int num_listensocks = 0; static apr_int32_t conns_this_child; /* MaxConnectionsPerChild, only access in listener thread */ @@ -199,6 +200,19 @@ module AP_MODULE_DECLARE_DATA mpm_event_module; struct event_srv_cfg_s; typedef struct event_srv_cfg_s event_srv_cfg; +static apr_pollfd_t *listener_pollfd; + +/* + * The pollset for sockets that are in any of the timeout queues. Currently + * we use the timeout_mutex to make sure that connections are added/removed + * atomically to/from both event_pollset and a timeout queue. Otherwise + * some confusion can happen under high load if timeout queues and pollset + * get out of sync. + * XXX: It should be possible to make the lock unnecessary in many or even all + * XXX: cases. + */ +static apr_pollset_t *event_pollset; + struct event_conn_state_t { /** APR_RING of expiration timeouts */ APR_RING_ENTRY(event_conn_state_t) timeout_list; @@ -228,9 +242,10 @@ APR_RING_HEAD(timeout_head_t, event_conn_state_t); struct timeout_queue { struct timeout_head_t head; - int count, *total; apr_interval_time_t timeout; - struct timeout_queue *next; + apr_uint32_t count; /* for this queue */ + apr_uint32_t *total; /* for all chained/related queues */ + struct timeout_queue *next; /* chaining */ }; /* * Several timeout queues that use different timeouts, so that we always can @@ -244,52 +259,65 @@ static struct timeout_queue *write_completion_q, *keepalive_q, *linger_q, *short_linger_q; +static volatile apr_time_t queues_next_expiry; -static apr_pollfd_t *listener_pollfd; +/* Prevent extra poll/wakeup calls for timeouts close in the future (queues + * have the granularity of a second anyway). + * XXX: Wouldn't 0.5s (instead of 0.1s) be "enough"? + */ +#define TIMEOUT_FUDGE_FACTOR apr_time_from_msec(100) /* * Macros for accessing struct timeout_queue. * For TO_QUEUE_APPEND and TO_QUEUE_REMOVE, timeout_mutex must be held. */ -#define TO_QUEUE_APPEND(q, el) \ - do { \ - APR_RING_INSERT_TAIL(&(q)->head, el, event_conn_state_t, \ - timeout_list); \ - ++*(q)->total; \ - ++(q)->count; \ - } while (0) +static void TO_QUEUE_APPEND(struct timeout_queue *q, event_conn_state_t *el) +{ + apr_time_t q_expiry; + apr_time_t next_expiry; -#define TO_QUEUE_REMOVE(q, el) \ - do { \ - APR_RING_REMOVE(el, timeout_list); \ - --*(q)->total; \ - --(q)->count; \ - } while (0) + APR_RING_INSERT_TAIL(&q->head, el, event_conn_state_t, timeout_list); + apr_atomic_inc32(q->total); + ++q->count; -#define TO_QUEUE_INIT(q, p, t, v) \ - do { \ - struct timeout_queue *b = (v); \ - (q) = apr_palloc((p), sizeof *(q)); \ - APR_RING_INIT(&(q)->head, event_conn_state_t, timeout_list); \ - (q)->total = (b) ? (b)->total : apr_pcalloc((p), sizeof *(q)->total); \ - (q)->count = 0; \ - (q)->timeout = (t); \ - (q)->next = NULL; \ - } while (0) + /* Cheaply update the overall queues' next expiry according to the + * first entry of this queue (oldest), if necessary. + */ + el = APR_RING_FIRST(&q->head); + q_expiry = el->queue_timestamp + q->timeout; + next_expiry = queues_next_expiry; + if (!next_expiry || next_expiry > q_expiry + TIMEOUT_FUDGE_FACTOR) { + queues_next_expiry = q_expiry; + /* Unblock the poll()ing listener for it to update its timeout. */ + if (listener_is_wakeable) { + apr_pollset_wakeup(event_pollset); + } + } +} -#define TO_QUEUE_ELEM_INIT(el) APR_RING_ELEM_INIT(el, timeout_list) +static void TO_QUEUE_REMOVE(struct timeout_queue *q, event_conn_state_t *el) +{ + APR_RING_REMOVE(el, timeout_list); + apr_atomic_dec32(q->total); + --q->count; +} -/* - * The pollset for sockets that are in any of the timeout queues. Currently - * we use the timeout_mutex to make sure that connections are added/removed - * atomically to/from both event_pollset and a timeout queue. Otherwise - * some confusion can happen under high load if timeout queues and pollset - * get out of sync. - * XXX: It should be possible to make the lock unnecessary in many or even all - * XXX: cases. - */ -static apr_pollset_t *event_pollset; +static struct timeout_queue *TO_QUEUE_MAKE(apr_pool_t *p, apr_time_t t, + struct timeout_queue *ref) +{ + struct timeout_queue *q; + + q = apr_pcalloc(p, sizeof *q); + APR_RING_INIT(&q->head, event_conn_state_t, timeout_list); + q->total = (ref) ? ref->total : apr_pcalloc(p, sizeof *q->total); + q->timeout = t; + return q; +} + +#define TO_QUEUE_ELEM_INIT(el) \ + APR_RING_ELEM_INIT((el), timeout_list) + /* The structure used to pass unique initialization info to each thread */ typedef struct { @@ -474,6 +502,11 @@ static void wakeup_listener(void) return; } + /* Unblock the listener if it's poll()ing */ + if (listener_is_wakeable) { + apr_pollset_wakeup(event_pollset); + } + /* unblock the listener if it's waiting for a worker */ ap_queue_info_term(worker_queue_info); @@ -647,7 +680,11 @@ static apr_status_t decrement_connection_count(voi default: break; } - apr_atomic_dec32(&connection_count); + /* Unblock the listener if it's waiting for connection_count = 0 */ + if (!apr_atomic_dec32(&connection_count) + && listener_is_wakeable && listener_may_exit) { + apr_pollset_wakeup(event_pollset); + } return APR_SUCCESS; } @@ -810,6 +847,7 @@ static void notify_resume(event_conn_state_t *cs, static int start_lingering_close_common(event_conn_state_t *cs, int in_worker) { + int done = 0; apr_status_t rv; struct timeout_queue *q; apr_socket_t *csd = cs->pfd.desc.s; @@ -842,25 +880,24 @@ static int start_lingering_close_common(event_conn else { cs->c->sbh = NULL; } - apr_thread_mutex_lock(timeout_mutex); - TO_QUEUE_APPEND(q, cs); cs->pfd.reqevents = ( cs->pub.sense == CONN_SENSE_WANT_WRITE ? APR_POLLOUT : APR_POLLIN) | APR_POLLHUP | APR_POLLERR; cs->pub.sense = CONN_SENSE_DEFAULT; + apr_thread_mutex_lock(timeout_mutex); rv = apr_pollset_add(event_pollset, &cs->pfd); + if (rv == APR_SUCCESS || APR_STATUS_IS_EEXIST(rv)) { + TO_QUEUE_APPEND(q, cs); + done = 1; + } apr_thread_mutex_unlock(timeout_mutex); - if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) { + if (!done) { ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03092) "start_lingering_close: apr_pollset_add failure"); - apr_thread_mutex_lock(timeout_mutex); - TO_QUEUE_REMOVE(q, cs); - apr_thread_mutex_unlock(timeout_mutex); apr_socket_close(cs->pfd.desc.s); ap_push_pool(worker_queue_info, cs->p); - return 0; } - return 1; + return done; } /* @@ -1124,16 +1161,27 @@ read_request: * Set a write timeout for this connection, and let the * event thread poll for writeability. */ + int done = 0; cs->queue_timestamp = apr_time_now(); notify_suspend(cs); - apr_thread_mutex_lock(timeout_mutex); - TO_QUEUE_APPEND(cs->sc->wc_q, cs); cs->pfd.reqevents = ( cs->pub.sense == CONN_SENSE_WANT_READ ? APR_POLLIN : APR_POLLOUT) | APR_POLLHUP | APR_POLLERR; cs->pub.sense = CONN_SENSE_DEFAULT; + apr_thread_mutex_lock(timeout_mutex); rc = apr_pollset_add(event_pollset, &cs->pfd); + if (rc == APR_SUCCESS || APR_STATUS_IS_EEXIST(rc)) { + TO_QUEUE_APPEND(cs->sc->wc_q, cs); + done = 1; + } apr_thread_mutex_unlock(timeout_mutex); + if (!done) { + ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf, APLOGNO(03465) + "process_socket: apr_pollset_add failure for " + "write completion"); + apr_socket_close(cs->pfd.desc.s); + ap_push_pool(worker_queue_info, cs->p); + } return; } else if (c->keepalive != AP_CONN_KEEPALIVE || c->aborted || @@ -1161,20 +1209,26 @@ read_request: * timeout today. With a normal client, the socket will be readable in * a few milliseconds anyway. */ + int done = 0; cs->queue_timestamp = apr_time_now(); notify_suspend(cs); - apr_thread_mutex_lock(timeout_mutex); - TO_QUEUE_APPEND(cs->sc->ka_q, cs); /* Add work to pollset. */ cs->pfd.reqevents = APR_POLLIN; + apr_thread_mutex_lock(timeout_mutex); rc = apr_pollset_add(event_pollset, &cs->pfd); + if (rc == APR_SUCCESS || APR_STATUS_IS_EEXIST(rc)) { + TO_QUEUE_APPEND(cs->sc->ka_q, cs); + done = 1; + } apr_thread_mutex_unlock(timeout_mutex); - - if (rc != APR_SUCCESS) { + if (!done) { ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf, APLOGNO(03093) - "process_socket: apr_pollset_add failure"); - AP_DEBUG_ASSERT(rc == APR_SUCCESS); + "process_socket: apr_pollset_add failure for " + "keep alive"); + apr_socket_close(cs->pfd.desc.s); + ap_push_pool(worker_queue_info, cs->p); + return; } } else if (cs->pub.state == CONN_STATE_SUSPENDED) { @@ -1345,7 +1399,14 @@ static void get_worker(int *have_idle_worker_p, in static APR_RING_HEAD(timer_free_ring_t, timer_event_t) timer_free_ring; static apr_skiplist *timer_skiplist; +static volatile apr_time_t timers_next_expiry; +/* Same goal as for TIMEOUT_FUDGE_FACTOR (avoid extra poll calls), but applied + * to timers. Since their timeouts are custom (user defined), we can't be too + * approximative here (hence using 0.01s). + */ +#define EVENT_FUDGE_FACTOR apr_time_from_msec(10) + /* The following compare function is used by apr_skiplist_insert() to keep the * elements (timers) sorted and provide O(log n) complexity (this is also true * for apr_skiplist_{find,remove}(), but those are not used in MPM event where @@ -1391,9 +1452,25 @@ static apr_status_t event_register_timed_callback( /* XXXXX: optimize */ te->when = t + apr_time_now(); - /* Okay, add sorted by when.. */ - apr_skiplist_insert(timer_skiplist, te); + { + apr_time_t next_expiry; + /* Okay, add sorted by when.. */ + apr_skiplist_insert(timer_skiplist, te); + + /* Cheaply update the overall timers' next expiry according to + * this event, if necessary. + */ + next_expiry = timers_next_expiry; + if (!next_expiry || next_expiry > te->when + EVENT_FUDGE_FACTOR) { + timers_next_expiry = te->when; + /* Unblock the poll()ing listener for it to update its timeout. */ + if (listener_is_wakeable) { + apr_pollset_wakeup(event_pollset); + } + } + } + apr_thread_mutex_unlock(g_timer_skiplist_mtx); return APR_SUCCESS; @@ -1427,15 +1504,14 @@ static void process_lingering_close(event_conn_sta apr_thread_mutex_lock(timeout_mutex); rv = apr_pollset_remove(event_pollset, pfd); - AP_DEBUG_ASSERT(rv == APR_SUCCESS); + TO_QUEUE_REMOVE(q, cs); + apr_thread_mutex_unlock(timeout_mutex); + AP_DEBUG_ASSERT(rv == APR_SUCCESS || APR_STATUS_IS_NOTFOUND(rv)); + TO_QUEUE_ELEM_INIT(cs); rv = apr_socket_close(csd); AP_DEBUG_ASSERT(rv == APR_SUCCESS); - TO_QUEUE_REMOVE(q, cs); - apr_thread_mutex_unlock(timeout_mutex); - TO_QUEUE_ELEM_INIT(cs); - ap_push_pool(worker_queue_info, cs->p); if (dying) ap_queue_interrupt_one(worker_queue); @@ -1449,13 +1525,13 @@ static void process_timeout_queue(struct timeout_q apr_time_t timeout_time, int (*func)(event_conn_state_t *)) { - int total = 0, count; + apr_uint32_t total = 0, count; event_conn_state_t *first, *cs, *last; struct timeout_head_t trash; struct timeout_queue *qp; apr_status_t rv; - if (!*q->total) { + if (!apr_atomic_read32(q->total)) { return; } @@ -1464,20 +1540,32 @@ static void process_timeout_queue(struct timeout_q count = 0; cs = first = last = APR_RING_FIRST(&qp->head); while (cs != APR_RING_SENTINEL(&qp->head, event_conn_state_t, - timeout_list) - /* Trash the entry if: - * - no timeout_time was given (asked for all), or - * - it expired (according to the queue timeout), or - * - the system clock skewed in the past: no entry should be - * registered above the given timeout_time (~now) + the queue - * timeout, we won't keep any here (eg. for centuries). - * Stop otherwise, no following entry will match thanks to the - * single timeout per queue (entries are added to the end!). - * This allows maintenance in O(1). - */ - && (!timeout_time - || cs->queue_timestamp + qp->timeout < timeout_time - || cs->queue_timestamp > timeout_time + qp->timeout)) { + timeout_list)) { + /* Trash the entry if: + * - no timeout_time was given (asked for all), or + * - it expired (according to the queue timeout), or + * - the system clock skewed in the past: no entry should be + * registered above the given timeout_time (~now) + the queue + * timeout, we won't keep any here (eg. for centuries). + * + * Otherwise stop, no following entry will match thanks to the + * single timeout per queue (entries are added to the end!). + * This allows maintenance in O(1). + */ + if (timeout_time + && cs->queue_timestamp + qp->timeout > timeout_time + && cs->queue_timestamp < timeout_time + qp->timeout) { + /* Since this is the next expiring of this queue, update the + * overall queues' next expiry if it's later than this one. + */ + apr_time_t q_expiry = cs->queue_timestamp + qp->timeout; + apr_time_t next_expiry = queues_next_expiry; + if (!next_expiry || next_expiry > q_expiry) { + queues_next_expiry = q_expiry; + } + break; + } + last = cs; rv = apr_pollset_remove(event_pollset, &cs->pfd); if (rv != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rv)) { @@ -1493,6 +1581,8 @@ static void process_timeout_queue(struct timeout_q APR_RING_UNSPLICE(first, last, timeout_list); APR_RING_SPLICE_TAIL(&trash, first, last, event_conn_state_t, timeout_list); + AP_DEBUG_ASSERT(apr_atomic_read32(q->total) >= count); + apr_atomic_sub32(q->total, count); qp->count -= count; total += count; } @@ -1499,8 +1589,6 @@ static void process_timeout_queue(struct timeout_q if (!total) return; - AP_DEBUG_ASSERT(*q->total >= total); - *q->total -= total; apr_thread_mutex_unlock(timeout_mutex); first = APR_RING_FIRST(&trash); do { @@ -1512,13 +1600,28 @@ static void process_timeout_queue(struct timeout_q apr_thread_mutex_lock(timeout_mutex); } +static void process_keepalive_queue(apr_time_t timeout_time) +{ + /* If all workers are busy, we kill older keep-alive connections so + * that they may connect to another process. + */ + if (!timeout_time) { + ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf, + "All workers are busy or dying, will close %u " + "keep-alive connections", + apr_atomic_read32(keepalive_q->total)); + } + process_timeout_queue(keepalive_q, timeout_time, + start_lingering_close_nonblocking); +} + static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) { - timer_event_t *ep; timer_event_t *te; apr_status_t rc; proc_info *ti = dummy; int process_slot = ti->pslot; + struct process_score *ps = ap_get_scoreboard_process(process_slot); apr_pool_t *tpool = apr_thread_pool_get(thd); void *csd = NULL; apr_pool_t *ptrans; /* Pool for per-transaction stuff */ @@ -1534,14 +1637,6 @@ static void * APR_THREAD_FUNC listener_thread(apr_ last_log = apr_time_now(); free(ti); - /* the following times out events that are really close in the future - * to prevent extra poll calls - * - * current value is .1 second - */ -#define TIMEOUT_FUDGE_FACTOR 100000 -#define EVENT_FUDGE_FACTOR 10000 - rc = init_pollset(tpool); if (rc != APR_SUCCESS) { ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf, @@ -1559,6 +1654,7 @@ static void * APR_THREAD_FUNC listener_thread(apr_ for (;;) { int workers_were_busy = 0; + if (listener_may_exit) { close_listeners(process_slot, &closed); if (terminate_mode == ST_UNGRACEFUL @@ -1572,7 +1668,7 @@ static void * APR_THREAD_FUNC listener_thread(apr_ now = apr_time_now(); if (APLOGtrace6(ap_server_conf)) { /* trace log status every second */ - if (now - last_log > apr_time_from_msec(1000)) { + if (now - last_log > apr_time_from_sec(1)) { last_log = now; apr_thread_mutex_lock(timeout_mutex); ap_log_error(APLOG_MARK, APLOG_TRACE6, 0, ap_server_conf, @@ -1580,8 +1676,8 @@ static void * APR_THREAD_FUNC listener_thread(apr_ "keep-alive: %d lingering: %d suspended: %u)", apr_atomic_read32(&connection_count), apr_atomic_read32(&clogged_count), - *write_completion_q->total, - *keepalive_q->total, + apr_atomic_read32(write_completion_q->total), + apr_atomic_read32(keepalive_q->total), apr_atomic_read32(&lingering_count), apr_atomic_read32(&suspended_count)); if (dying) { @@ -1594,32 +1690,71 @@ static void * APR_THREAD_FUNC listener_thread(apr_ } } - apr_thread_mutex_lock(g_timer_skiplist_mtx); - te = apr_skiplist_peek(timer_skiplist); - if (te) { - if (te->when > now) { - timeout_interval = te->when - now; + /* Start with an infinite poll() timeout and update it according to + * the next expiring timer or queue entry. If there are none, either + * the listener is wakeable and it can poll() indefinitely until a wake + * up occurs, otherwise periodic checks (maintenance, shutdown, ...) + * must be performed. + */ + timeout_interval = -1; + + /* Push expired timers to a worker, the first remaining one determines + * the maximum time to poll() below, if any. + */ + timeout_time = timers_next_expiry; + if (timeout_time && timeout_time < now + EVENT_FUDGE_FACTOR) { + apr_thread_mutex_lock(g_timer_skiplist_mtx); + while ((te = apr_skiplist_peek(timer_skiplist))) { + if (te->when > now + EVENT_FUDGE_FACTOR) { + timers_next_expiry = te->when; + timeout_interval = te->when - now; + break; + } + apr_skiplist_pop(timer_skiplist, NULL); + push_timer2worker(te); } - else { - timeout_interval = 1; + if (!te) { + timers_next_expiry = 0; } + apr_thread_mutex_unlock(g_timer_skiplist_mtx); } - else { - timeout_interval = apr_time_from_msec(100); + + /* Same for queues, use their next expiry, if any. */ + timeout_time = queues_next_expiry; + if (timeout_time + && (timeout_interval < 0 + || timeout_time <= now + || timeout_interval > timeout_time - now)) { + timeout_interval = timeout_time > now ? timeout_time - now : 1; } - apr_thread_mutex_unlock(g_timer_skiplist_mtx); + /* When non-wakeable, don't wait more than 100 ms, in any case. */ +#define NON_WAKEABLE_POLL_TIMEOUT apr_time_from_msec(100) + if (!listener_is_wakeable + && (timeout_interval < 0 + || timeout_interval > NON_WAKEABLE_POLL_TIMEOUT)) { + timeout_interval = NON_WAKEABLE_POLL_TIMEOUT; + } + rc = apr_pollset_poll(event_pollset, timeout_interval, &num, &out_pfd); if (rc != APR_SUCCESS) { if (APR_STATUS_IS_EINTR(rc)) { - continue; + /* Woken up, if we are exiting we must fall through to kill + * kept-alive connections, otherwise we only need to update + * timeouts (logic is above, so restart the loop). + */ + if (!listener_may_exit) { + continue; + } + timeout_time = 0; } - if (!APR_STATUS_IS_TIMEUP(rc)) { + else if (!APR_STATUS_IS_TIMEUP(rc)) { ap_log_error(APLOG_MARK, APLOG_CRIT, rc, ap_server_conf, "apr_pollset_poll failed. Attempting to " "shutdown process gracefully"); signal_threads(ST_GRACEFUL); } + num = 0; } if (listener_may_exit) { @@ -1629,21 +1764,6 @@ static void * APR_THREAD_FUNC listener_thread(apr_ break; } - now = apr_time_now(); - apr_thread_mutex_lock(g_timer_skiplist_mtx); - ep = apr_skiplist_peek(timer_skiplist); - while (ep) { - if (ep->when < now + EVENT_FUDGE_FACTOR) { - apr_skiplist_pop(timer_skiplist, NULL); - push_timer2worker(ep); - } - else { - break; - } - ep = apr_skiplist_peek(timer_skiplist); - } - apr_thread_mutex_unlock(g_timer_skiplist_mtx); - while (num) { pt = (listener_poll_type *) out_pfd->client_data; if (pt->type == PT_CSD) { @@ -1666,7 +1786,6 @@ static void * APR_THREAD_FUNC listener_thread(apr_ TO_QUEUE_REMOVE(remove_from_q, cs); rc = apr_pollset_remove(event_pollset, &cs->pfd); apr_thread_mutex_unlock(timeout_mutex); - /* * Some of the pollset backends, like KQueue or Epoll * automagically remove the FD if the socket is closed, @@ -1808,52 +1927,55 @@ static void * APR_THREAD_FUNC listener_thread(apr_ /* XXX possible optimization: stash the current time for use as * r->request_time for new requests */ - now = apr_time_now(); - /* We only do this once per 0.1s (TIMEOUT_FUDGE_FACTOR), or on a clock - * skew (if the system time is set back in the meantime, timeout_time - * will exceed now + TIMEOUT_FUDGE_FACTOR, can't happen otherwise). + /* We process the timeout queues here only when their overall next + * expiry (read once above) is over. This happens accurately since + * adding to the queues (in workers) can only decrease this expiry, + * while latest ones are only taken into account here (in listener) + * during queues' processing, with the lock held. This works both + * with and without wake-ability. */ - if (now > timeout_time || now + TIMEOUT_FUDGE_FACTOR < timeout_time ) { - struct process_score *ps; + if (timeout_time && timeout_time < (now = apr_time_now())) { timeout_time = now + TIMEOUT_FUDGE_FACTOR; /* handle timed out sockets */ apr_thread_mutex_lock(timeout_mutex); + /* Processing all the queues below will recompute this. */ + queues_next_expiry = 0; + /* Step 1: keepalive timeouts */ - /* If all workers are busy, we kill older keep-alive connections so that they - * may connect to another process. - */ - if ((workers_were_busy || dying) && *keepalive_q->total) { - if (!dying) - ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf, - "All workers are busy, will close %d keep-alive " - "connections", - *keepalive_q->total); - process_timeout_queue(keepalive_q, 0, - start_lingering_close_nonblocking); + if (workers_were_busy || dying) { + process_keepalive_queue(0); /* kill'em all \m/ */ } else { - process_timeout_queue(keepalive_q, timeout_time, - start_lingering_close_nonblocking); + process_keepalive_queue(timeout_time); } /* Step 2: write completion timeouts */ process_timeout_queue(write_completion_q, timeout_time, start_lingering_close_nonblocking); /* Step 3: (normal) lingering close completion timeouts */ - process_timeout_queue(linger_q, timeout_time, stop_lingering_close); + process_timeout_queue(linger_q, timeout_time, + stop_lingering_close); /* Step 4: (short) lingering close completion timeouts */ - process_timeout_queue(short_linger_q, timeout_time, stop_lingering_close); + process_timeout_queue(short_linger_q, timeout_time, + stop_lingering_close); - ps = ap_get_scoreboard_process(process_slot); - ps->write_completion = *write_completion_q->total; - ps->keep_alive = *keepalive_q->total; apr_thread_mutex_unlock(timeout_mutex); + ps->keep_alive = apr_atomic_read32(keepalive_q->total); + ps->write_completion = apr_atomic_read32(write_completion_q->total); ps->connections = apr_atomic_read32(&connection_count); ps->suspended = apr_atomic_read32(&suspended_count); ps->lingering_close = apr_atomic_read32(&lingering_count); } + else if ((workers_were_busy || dying) + && apr_atomic_read32(keepalive_q->total)) { + apr_thread_mutex_lock(timeout_mutex); + process_keepalive_queue(0); /* kill'em all \m/ */ + apr_thread_mutex_unlock(timeout_mutex); + ps->keep_alive = apr_atomic_read32(keepalive_q->total); + } + if (listeners_disabled && !workers_were_busy && (int)apr_atomic_read32(&connection_count) - (int)apr_atomic_read32(&lingering_count) @@ -2064,6 +2186,8 @@ static void *APR_THREAD_FUNC start_threads(apr_thr int prev_threads_created; int max_recycled_pools = -1; int good_methods[] = {APR_POLLSET_KQUEUE, APR_POLLSET_PORT, APR_POLLSET_EPOLL}; + /* XXX don't we need more to handle K-A or lingering close? */ + const apr_uint32_t pollset_size = threads_per_child * 2; /* We must create the fd queues before we start up the listener * and worker threads. */ @@ -2103,24 +2227,24 @@ static void *APR_THREAD_FUNC start_threads(apr_thr /* Create the main pollset */ for (i = 0; i < sizeof(good_methods) / sizeof(good_methods[0]); i++) { - rv = apr_pollset_create_ex(&event_pollset, - threads_per_child*2, /* XXX don't we need more, to handle - * connections in K-A or lingering - * close? - */ - pchild, APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY | APR_POLLSET_NODEFAULT, - good_methods[i]); + apr_uint32_t flags = APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY | + APR_POLLSET_NODEFAULT | APR_POLLSET_WAKEABLE; + rv = apr_pollset_create_ex(&event_pollset, pollset_size, pchild, flags, + good_methods[i]); if (rv == APR_SUCCESS) { + listener_is_wakeable = 1; break; } + flags &= ~APR_POLLSET_WAKEABLE; + rv = apr_pollset_create_ex(&event_pollset, pollset_size, pchild, flags, + good_methods[i]); + if (rv == APR_SUCCESS) { + break; + } } if (rv != APR_SUCCESS) { - rv = apr_pollset_create(&event_pollset, - threads_per_child*2, /* XXX don't we need more, to handle - * connections in K-A or lingering - * close? - */ - pchild, APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY); + rv = apr_pollset_create(&event_pollset, pollset_size, pchild, + APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY); } if (rv != APR_SUCCESS) { ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03103) @@ -2129,7 +2253,9 @@ static void *APR_THREAD_FUNC start_threads(apr_thr } ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(02471) - "start_threads: Using %s", apr_pollset_method_name(event_pollset)); + "start_threads: Using %s (%swakeable)", + apr_pollset_method_name(event_pollset), + listener_is_wakeable ? "" : "not "); worker_sockets = apr_pcalloc(pchild, threads_per_child * sizeof(apr_socket_t *)); @@ -3262,10 +3388,10 @@ static int event_post_config(apr_pool_t *pconf, ap wc.hash = apr_hash_make(ptemp); ka.hash = apr_hash_make(ptemp); - TO_QUEUE_INIT(linger_q, pconf, - apr_time_from_sec(MAX_SECS_TO_LINGER), NULL); - TO_QUEUE_INIT(short_linger_q, pconf, - apr_time_from_sec(SECONDS_TO_LINGER), NULL); + linger_q = TO_QUEUE_MAKE(pconf, apr_time_from_sec(MAX_SECS_TO_LINGER), + NULL); + short_linger_q = TO_QUEUE_MAKE(pconf, apr_time_from_sec(SECONDS_TO_LINGER), + NULL); for (; s; s = s->next) { event_srv_cfg *sc = apr_pcalloc(pconf, sizeof *sc); @@ -3273,11 +3399,11 @@ static int event_post_config(apr_pool_t *pconf, ap ap_set_module_config(s->module_config, &mpm_event_module, sc); if (!wc.tail) { /* The main server uses the global queues */ - TO_QUEUE_INIT(wc.q, pconf, s->timeout, NULL); + wc.q = TO_QUEUE_MAKE(pconf, s->timeout, NULL); apr_hash_set(wc.hash, &s->timeout, sizeof s->timeout, wc.q); wc.tail = write_completion_q = wc.q; - TO_QUEUE_INIT(ka.q, pconf, s->keep_alive_timeout, NULL); + ka.q = TO_QUEUE_MAKE(pconf, s->keep_alive_timeout, NULL); apr_hash_set(ka.hash, &s->keep_alive_timeout, sizeof s->keep_alive_timeout, ka.q); ka.tail = keepalive_q = ka.q; @@ -3287,7 +3413,7 @@ static int event_post_config(apr_pool_t *pconf, ap * or their own queue(s) if there isn't */ wc.q = apr_hash_get(wc.hash, &s->timeout, sizeof s->timeout); if (!wc.q) { - TO_QUEUE_INIT(wc.q, pconf, s->timeout, wc.tail); + wc.q = TO_QUEUE_MAKE(pconf, s->timeout, wc.tail); apr_hash_set(wc.hash, &s->timeout, sizeof s->timeout, wc.q); wc.tail = wc.tail->next = wc.q; } @@ -3295,7 +3421,7 @@ static int event_post_config(apr_pool_t *pconf, ap ka.q = apr_hash_get(ka.hash, &s->keep_alive_timeout, sizeof s->keep_alive_timeout); if (!ka.q) { - TO_QUEUE_INIT(ka.q, pconf, s->keep_alive_timeout, ka.tail); + ka.q = TO_QUEUE_MAKE(pconf, s->keep_alive_timeout, ka.tail); apr_hash_set(ka.hash, &s->keep_alive_timeout, sizeof s->keep_alive_timeout, ka.q); ka.tail = ka.tail->next = ka.q; Index: server/mpm/event/fdqueue.c =================================================================== --- server/mpm/event/fdqueue.c (revision 1780129) +++ server/mpm/event/fdqueue.c (working copy) @@ -269,9 +269,8 @@ void ap_pop_pool(apr_pool_t ** recycled_pool, fd_q if (first_pool == NULL) { break; } - if (apr_atomic_casptr - ((void*) &(queue_info->recycled_pools), - first_pool->next, first_pool) == first_pool) { + if (apr_atomic_casptr((void*) &(queue_info->recycled_pools), + first_pool->next, first_pool) == first_pool) { *recycled_pool = first_pool->pool; if (queue_info->max_recycled_pools >= 0) apr_atomic_dec32(&queue_info->recycled_pools_count);