On Fri, Jul 14, 2017 at 9:52 PM, Yann Ylavic <[email protected]> wrote:
>
> So overall, this patch may introduce the need for more workers than
> before, what was (wrongly) done by the listener thread has to be done
> somewhere anyway...
That patch didn't work (as reported by Stefan Pribe) and I now don't
feel the need to debug it further, see below.
>
> Finally, I think there is room for improvements like batching
> shutdowns in the same worker if there is no objection on the approach
> so far.
That's the way to go IMO, here is a new patch which is much simpler
and effective I think.
The idea is that when nonblocking is required (i.e. in the listener),
connections to flush and close are atomically pushed/popped to/from a
chain (linked list) by the listener/some worker.
So start_lingering_close_nonblocking() simply fills the chain (this is
atomic/nonblocking), and any worker thread which is done with its
current connection will empty the chain while calling
start_lingering_close_blocking() for each connection.
To prevent starvation of deferred lingering closes, the listener may
create a worker at the of its loop, when/if the chain is (fully)
filled.
While the previous patch potentially induced some overhead in the
number workers and thread contexts switches, I think this new one much
better in this regard.
What do you think of it?
Index: server/mpm/event/event.c
===================================================================
--- server/mpm/event/event.c (revision 1802058)
+++ server/mpm/event/event.c (working copy)
@@ -219,6 +219,12 @@ static apr_pollfd_t *listener_pollfd;
*/
static apr_pollset_t *event_pollset;
+/*
+ * The chain of connections to be shutdown by a worker thread (deferred),
+ * linked list updated atomically.
+ */
+static event_conn_state_t *volatile defer_linger_chain = NULL;
+
struct event_conn_state_t {
/** APR_RING of expiration timeouts */
APR_RING_ENTRY(event_conn_state_t) timeout_list;
@@ -243,7 +249,10 @@ struct event_conn_state_t {
apr_pollfd_t pfd;
/** public parts of the connection state */
conn_state_t pub;
+ /** chaining in defer_linger_chain */
+ struct event_conn_state_t *chain;
};
+
APR_RING_HEAD(timeout_head_t, event_conn_state_t);
struct timeout_queue {
@@ -805,31 +814,27 @@ static int start_lingering_close_blocking(event_co
}
/*
- * Close our side of the connection, NOT flushing data to the client.
- * This should only be called if there has been an error or if we know
- * that our send buffers are empty.
+ * Defer flush and close of the connection by adding it to defer_linger_chain,
+ * for a worker to grab it and do the job (should that be blocking).
* Pre-condition: cs is not in any timeout queue and not in the pollset,
* timeout_mutex is not locked
- * return: 0 if connection is fully closed,
- * 1 if connection is lingering
- * may be called by listener thread
+ * return: 1 connection is alive (but aside and about to linger)
+ * May be called by listener thread.
*/
static int start_lingering_close_nonblocking(event_conn_state_t *cs)
{
- conn_rec *c = cs->c;
- apr_socket_t *csd = cs->pfd.desc.s;
-
- if (ap_prep_lingering_close(c)
- || c->aborted
- || ap_shutdown_conn(c, 0) != APR_SUCCESS || c->aborted
- || apr_socket_shutdown(csd, APR_SHUTDOWN_WRITE) != APR_SUCCESS) {
- apr_socket_close(csd);
- ap_push_pool(worker_queue_info, cs->p);
- if (dying)
- ap_queue_interrupt_one(worker_queue);
- return 0;
+ event_conn_state_t *chain;
+ for (;;) {
+ /* Atomically exchange defer_linger_chain with cs which is
+ * the new head (i.e. chained to previous defer_linger_chain).
+ */
+ chain = cs->chain = defer_linger_chain;
+ if (apr_atomic_casptr((void *)&defer_linger_chain,
+ cs, chain) == chain) {
+ break;
+ }
}
- return start_lingering_close_common(cs, 0);
+ return 1;
}
/*
@@ -837,6 +842,8 @@ static int start_lingering_close_nonblocking(event
* expired
* Pre-condition: cs is not in any timeout queue and not in the pollset
* return: irrelevant (need same prototype as start_lingering_close)
+ * Note: since lingering, the socket timeout is set to zero, hence we are
+ * nonblocking here.
*/
static int stop_lingering_close(event_conn_state_t *cs)
{
@@ -1287,26 +1294,31 @@ static apr_status_t push_timer2worker(timer_event_
}
/*
- * Pre-condition: pfd->cs is neither in pollset nor timeout queue
+ * Pre-condition: cs is neither in event_pollset nor a timeout queue
* this function may only be called by the listener
*/
-static apr_status_t push2worker(const apr_pollfd_t * pfd,
- apr_pollset_t * pollset)
+static apr_status_t push2worker(event_conn_state_t *cs)
{
- listener_poll_type *pt = (listener_poll_type *) pfd->client_data;
- event_conn_state_t *cs = (event_conn_state_t *) pt->baton;
apr_status_t rc;
- rc = ap_queue_push(worker_queue, cs->pfd.desc.s, cs, cs->p);
+ if (cs) {
+ rc = ap_queue_push(worker_queue, cs->pfd.desc.s, cs, cs->p);
+ }
+ else {
+ rc = ap_queue_push(worker_queue, NULL, NULL, NULL);
+ }
if (rc != APR_SUCCESS) {
- /* trash the connection; we couldn't queue the connected
- * socket to a worker
- */
- apr_bucket_alloc_destroy(cs->bucket_alloc);
- apr_socket_close(cs->pfd.desc.s);
- ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
- ap_server_conf, APLOGNO(00471) "push2worker: ap_queue_push failed");
- ap_push_pool(worker_queue_info, cs->p);
+ if (cs) {
+ /* trash the connection; we couldn't queue the connected
+ * socket to a worker
+ */
+ apr_bucket_alloc_destroy(cs->bucket_alloc);
+ apr_socket_close(cs->pfd.desc.s);
+ ap_push_pool(worker_queue_info, cs->p);
+ }
+ ap_log_error(APLOG_MARK, APLOG_CRIT, rc, ap_server_conf, APLOGNO(00471)
+ "push2worker: ap_queue_push failed");
+ signal_threads(ST_GRACEFUL);
}
return rc;
@@ -1861,6 +1873,8 @@ static void * APR_THREAD_FUNC listener_thread(apr_
TO_QUEUE_REMOVE(remove_from_q, cs);
rc = apr_pollset_remove(event_pollset, &cs->pfd);
apr_thread_mutex_unlock(timeout_mutex);
+ TO_QUEUE_ELEM_INIT(cs);
+
/*
* Some of the pollset backends, like KQueue or Epoll
* automagically remove the FD if the socket is closed,
@@ -1874,7 +1888,6 @@ static void * APR_THREAD_FUNC listener_thread(apr_
break;
}
- TO_QUEUE_ELEM_INIT(cs);
/* If we didn't get a worker immediately for a keep-alive
* request, we close the connection, so that the client can
* re-connect to a different process.
@@ -1881,22 +1894,17 @@ static void * APR_THREAD_FUNC listener_thread(apr_
*/
if (!have_idle_worker) {
start_lingering_close_nonblocking(cs);
- break;
}
- rc = push2worker(out_pfd, event_pollset);
- if (rc != APR_SUCCESS) {
- ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
- ap_server_conf, APLOGNO(03095)
- "push2worker failed");
- }
- else {
+ else if (push2worker(cs) == APR_SUCCESS) {
have_idle_worker = 0;
}
break;
+
case CONN_STATE_LINGER_NORMAL:
case CONN_STATE_LINGER_SHORT:
process_lingering_close(cs, out_pfd);
break;
+
default:
ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
ap_server_conf, APLOGNO(03096)
@@ -2090,6 +2098,22 @@ static void * APR_THREAD_FUNC listener_thread(apr_
ps->keep_alive = 0;
}
+ /* If there are some lingering closes to defer (to a worker), schedule
+ * them now. We might wakeup a worker spuriously if another one empties
+ * defer_linger_chain in the meantime, but there also may be no active
+ * or all busy workers for an undefined time. In any case a deferred
+ * lingering close can't starve if we do that here since the chain is
+ * filled only above in the listener and it's emptied only in the
+ * worker(s); thus a NULL here means it will stay so while the listener
+ * waits (possibly indefinitely) in poll().
+ */
+ if (defer_linger_chain) {
+ get_worker(&have_idle_worker, 0, &workers_were_busy);
+ if (have_idle_worker && push2worker(NULL) == APR_SUCCESS) {
+ have_idle_worker = 0;
+ }
+ }
+
if (listeners_disabled && !workers_were_busy
&& ((c_count = apr_atomic_read32(&connection_count))
>= (l_count = apr_atomic_read32(&lingering_count))
@@ -2167,7 +2191,7 @@ static void *APR_THREAD_FUNC worker_thread(apr_thr
while (!workers_may_exit) {
apr_socket_t *csd = NULL;
- event_conn_state_t *cs;
+ event_conn_state_t *cs = NULL;
timer_event_t *te = NULL;
apr_pool_t *ptrans; /* Pool for per-transaction stuff */
@@ -2233,12 +2257,33 @@ static void *APR_THREAD_FUNC worker_thread(apr_thr
apr_thread_mutex_unlock(g_timer_skiplist_mtx);
}
}
- else {
+ else if (ptrans != NULL) {
is_idle = 0;
worker_sockets[thread_slot] = csd;
process_socket(thd, ptrans, csd, cs, process_slot, thread_slot);
worker_sockets[thread_slot] = NULL;
}
+
+ /* If there are deferred shutdowns, handle them now. */
+ for (;;) {
+ cs = defer_linger_chain;
+ if (!cs) {
+ break;
+ }
+
+ /* Atomically exchange defer_linger_chain with its chained
+ * entry (next), we can then shutdown the removed one (in cs).
+ */
+ if (apr_atomic_casptr((void *)&defer_linger_chain,
+ cs->chain, cs) == cs) {
+ cs->chain = NULL;
+ cs->pub.state = CONN_STATE_LINGER;
+ csd = ap_get_conn_socket(cs->c);
+ worker_sockets[thread_slot] = csd;
+ start_lingering_close_blocking(cs);
+ worker_sockets[thread_slot] = NULL;
+ }
+ }
}
ap_update_child_status_from_indexes(process_slot, thread_slot,